My scenario is quite simple, but I don't seem to able to find it anywhere.
I have a set of elements that I want to iterate through, for each one call an async function, and then wait for all of them to finish (which again happens in an asynchronous fashion, implemented in the logic of the function). I'm relatively new to RxJava and used to do this easily in NodeJS by passing callback to the function and wait at the end. Here's the pseudocode of what I need (the iterator for elements does not need to be synchronous nor ordered):
for(line in lines){
callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH
Your help is really appreciated!
Use Rx:
Observable
.from(lines)
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io())
.ignoreElements();
At this point, depending on what you want to do you can use an .switchIfEmpty(...)
to subscribe to another observable.
Well technically if you think about it, what you need to do is create an Observable from all of your elements, and then zip them together to continue your stream's execution.
That would in pseudocode give you something like this:
List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()
But it might come as no surprise that Observable.from()
can expose every element within an iterable as a stream of objects, thus eliminating your need for a loop. So you could create a new Observable that calls onCompleted()
when the async operation is complete, using Observable.fromCallable()
. Afterwards, you can wait for these new Observables by collecting them into a list.
Observable.from(lines)
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String line) {
return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
}
}).toList()
.map(new Func1<List<Object>, Object>() {
@Override
public Object call(List<Object> ignored) {
// do something;
}
});
I'm basing this second half of my answer heavily on this answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With