Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does task cancellation work in RxJava?

Tags:

java

rx-java

I'm unclear on how to implement task cancellation in RXJava.

I'm interested in porting an existing API built using Guava's ListenableFuture. My use case is as follows:

  • I have an single operation that's composed of a sequence of futures joined by Futures.transform()
  • Multiple subscribers observe the operation's final future.
  • Each observer can cancel the final future, and all observers witness the cancellation event.
  • Cancellation of the final future results in the cancellation of its dependencies, e.g. in sequence 1->2->3, cancellation of 3 is propagated to 2, and so on.

There's very little info in the RxJava wiki about this; the only references I can find to cancellation mention Subscription as an equivalent to .NET's Disposable, but as far as I can see, Subscription only offers the ability to unsubscribe from subsequent values in the sequence.

I'm unclear on how to implement "any subscriber can cancel" semantics through this API. Am I thinking about this in the wrong way?

Any input would be appreciated.

like image 672
user3452758 Avatar asked Aug 16 '14 21:08

user3452758


1 Answers

It's important to learn about Cold vs Hot Observables. If your Observables are cold, then their operations will not execute if you have no subscribers. Hence to "cancel", just make sure all Observers unsubscribe from the source Observable.

However, if only one Observer of the source unsubscribes, and there are other Observers still subscribed to the source, this will not incur a "cancelling". In that case you can use (but it's not the only solution) ConnectableObservables. Also see this link about Rx.NET.

A practical way of using ConnectableObservables is to simply call .publish().refCount() on any cold Observable. What that does is create one single "proxy" Observer which relays the events from the source to the actual Observers. The proxy Observer unsubscribes when the last actual Observer unsubscribes.

To manually control a ConnectableObservable, call just coldSource.publish() and you will get an instance of ConnectableObservable. Then you can call .connect() which will return you the Subscription of the "proxy" Observer. To manually "cancel" the source, you just unsubscribe the Subscription of the proxy Observer.


For your specific problem, you can also use the .takeUntil() operator.

Suppose your "final future" is ported as finalStream in RxJava, and suppose that "cancel events" are Observables cancelStream1, cancelStream2, etc, then it becomes fairly simple to "cancel" operations resulting from finalStream:

Observable<FooBar> finalAndCancelableStream = finalStream
    .takeUntil( Observable.merge(cancelStream1, cancelStream2) );

In diagrams, this is how takeUntil works, and this is how merge works.

In plain english, you can read it as "finalAndCancelableStream is the finalStream until either cancelStream1 or cancelStream2 emit an event".

like image 192
André Staltz Avatar answered Oct 21 '22 07:10

André Staltz