I'm unclear on how to implement task cancellation in RXJava.
I'm interested in porting an existing API built using Guava's ListenableFuture
. My use case is as follows:
Futures.transform()
1
->2
->3
, cancellation of 3
is propagated to 2
, and so on.There's very little info in the RxJava wiki about this; the only references I can find to cancellation mention Subscription
as an equivalent to .NET's Disposable
, but as far as I can see, Subscription only offers the ability to unsubscribe from subsequent values in the sequence.
I'm unclear on how to implement "any subscriber can cancel" semantics through this API. Am I thinking about this in the wrong way?
Any input would be appreciated.
It's important to learn about Cold vs Hot Observables. If your Observables are cold, then their operations will not execute if you have no subscribers. Hence to "cancel", just make sure all Observers unsubscribe from the source Observable.
However, if only one Observer of the source unsubscribes, and there are other Observers still subscribed to the source, this will not incur a "cancelling". In that case you can use (but it's not the only solution) ConnectableObservables. Also see this link about Rx.NET.
A practical way of using ConnectableObservables is to simply call .publish().refCount()
on any cold Observable. What that does is create one single "proxy" Observer which relays the events from the source to the actual Observers. The proxy Observer unsubscribes when the last actual Observer unsubscribes.
To manually control a ConnectableObservable, call just coldSource.publish()
and you will get an instance of ConnectableObservable. Then you can call .connect()
which will return you the Subscription of the "proxy" Observer. To manually "cancel" the source, you just unsubscribe the Subscription of the proxy Observer.
For your specific problem, you can also use the .takeUntil()
operator.
Suppose your "final future" is ported as finalStream
in RxJava, and suppose that "cancel events" are Observables cancelStream1
, cancelStream2
, etc, then it becomes fairly simple to "cancel" operations resulting from finalStream
:
Observable<FooBar> finalAndCancelableStream = finalStream
.takeUntil( Observable.merge(cancelStream1, cancelStream2) );
In diagrams, this is how takeUntil works, and this is how merge works.
In plain english, you can read it as "finalAndCancelableStream is the finalStream until either cancelStream1 or cancelStream2 emit an event".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With