Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait for multiple async calls to finish in RxJava

My scenario is quite simple, but I don't seem to able to find it anywhere.

I have a set of elements that I want to iterate through, for each one call an async function, and then wait for all of them to finish (which again happens in an asynchronous fashion, implemented in the logic of the function). I'm relatively new to RxJava and used to do this easily in NodeJS by passing callback to the function and wait at the end. Here's the pseudocode of what I need (the iterator for elements does not need to be synchronous nor ordered):

for(line in lines){
 callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH

Your help is really appreciated!

like image 506
qibra Avatar asked Sep 15 '16 18:09

qibra


2 Answers

Use Rx:

Observable
.from(lines)
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io())
.ignoreElements();

At this point, depending on what you want to do you can use an .switchIfEmpty(...) to subscribe to another observable.

like image 52
Tassos Bassoukos Avatar answered Oct 08 '22 13:10

Tassos Bassoukos


Well technically if you think about it, what you need to do is create an Observable from all of your elements, and then zip them together to continue your stream's execution.

That would in pseudocode give you something like this:

List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
   observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()

But it might come as no surprise that Observable.from() can expose every element within an iterable as a stream of objects, thus eliminating your need for a loop. So you could create a new Observable that calls onCompleted() when the async operation is complete, using Observable.fromCallable(). Afterwards, you can wait for these new Observables by collecting them into a list.

Observable.from(lines)
   .flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String line) {
            return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
        }
    }).toList()
      .map(new Func1<List<Object>, Object>() {
        @Override
        public Object call(List<Object> ignored) {
            // do something;
        }
    });

I'm basing this second half of my answer heavily on this answer.

like image 32
EpicPandaForce Avatar answered Oct 08 '22 13:10

EpicPandaForce