Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine a list of Observables and wait until all completed

TL;DR How to convert Task.whenAll(List<Task>) into RxJava?

My existing code uses Bolts to build up a list of asynchronous tasks and waits until all of those tasks finish before performing other steps. Essentially, it builds up a List<Task> and returns a single Task which is marked as completed when all tasks in the list complete, as per the example on the Bolts site.

I'm looking to replace Bolts with RxJava and I'm assuming this method of building up a list of async tasks (size not known in advance) and wrapping them all into a single Observable is possible, but I don't know how.

I've tried looking at merge, zip, concat etc... but can't get to work on the List<Observable> that I'd be building up as they all seem geared to working on just two Observables at a time if I understand the docs correctly.

I'm trying to learn RxJava and am still very new to it so forgive me if this is an obvious question or explained in the docs somewhere; I have tried searching. Any help would be much appreciated.

like image 601
Craig Russell Avatar asked Feb 12 '16 08:02

Craig Russell


People also ask

How do I combine multiple observables?

We can use the concat operator to take multiple Observables and return a new Observable that sequentially emits values from each Observable that were passed in. It works by subscribing to them one at a time and merging the results in the output Observable.

How do I combine two observables in Rxjava?

If you want to merge observables of different type you need to use Observable. zip : Observable<String> o1 = Observable. just("a", "b", "c"); Observable<Integer> o2 = Observable.

What is single Rxjava?

Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response. Sample Implementation: The below code always emits a Single user object. We use a Single Observable and a Single Observer.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.


3 Answers

You can use flatMap in case you have dynamic tasks composition. Something like this:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Another good example of parallel execution

Note: I do not really know your requirements for error handling. For example, what to do if only one task fails. I think you should verify this scenario.

like image 149
MyDogTom Avatar answered Oct 21 '22 02:10

MyDogTom


It sounds like you're looking for the Zip operator.

There are a few different ways of using it, so let's look at an example. Say we have a few simple observables of different types:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

The simplest way to wait for them all is something like this:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

Note that in the zip function, the parameters have concrete types that correspond to the types of the observables being zipped.

Zipping a list of observables is also possible, either directly:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...or by wrapping the list into an Observable<Observable<?>>:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

However, in both of these cases, the zip function can only accept a single Object[] parameter since the types of the observables in the list are not known in advance as well as their number. This means that that the zip function would have to check the number of parameters and cast them accordingly.

Regardless, all of the above examples will eventually print 1 Blah true

EDIT: When using Zip, make sure that the Observables being zipped all emit the same number of items. In the above examples all three observables emitted a single item. If we were to change them to something like this:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Then 1, Blah, True and 2, Hello, True would be the only items passed into the zip function(s). The item 3would never be zipped since the other observables have completed.

like image 77
Malt Avatar answered Oct 21 '22 02:10

Malt


Of the suggestions proposed, zip() actually combines observable results with each other, which may or may not be what is wanted, but was not asked in the question. In the question, all that was wanted was execution of each of the operations, either one-by-one or in parallel (which was not specified, but linked Bolts example was about parallel execution). Also, zip() will complete immediately when any of the observables complete, so it's in violation of the requirements.

For parallel execution of Observables, flatMap() presented in the other answer is fine, but merge() would be more straight-forward. Note that merge will exit on error of any of the Observables, if you rather postpone the exit until all observables have finished, you should be looking at mergeDelayError().

For one-by-one, I think Observable.concat() static method should be used. Its javadoc states like this:

concat(java.lang.Iterable> sequences) Flattens an Iterable of Observables into one Observable, one after the other, without interleaving them

which sounds like what you're after if you don't want parallel execution.

Also, if you're only interested in the completion of your task, not return values, you should probably look into Completable instead of Observable.

TLDR: for one-by-one execution of tasks and oncompletion event when they are completed, I think Completable.concat() is best suited. For parallel execution, Completable.merge() or Completable.mergeDelayError() sounds like the solution. The former one will stop immediately on any error on any completable, the latter one will execute them all even if one of them has an error, and only then reports the error.

like image 20
eis Avatar answered Oct 21 '22 02:10

eis