So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.
So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"
It took me an hour or 2, but I eventually found the Zip Functionality and it helps me out handily:
Observable<Integer> one = getNumberedObservable(1);
Observable<Integer> two = getNumberedObservable(2);
Observable<Integer> three = getNumberedObservable(3);
Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1, Integer arg2) {
System.out.println("Zip0: " + arg0);
System.out.println("Zip1: " + arg1);
System.out.println("Zip2: " + arg2);
return arg0 + arg1 + arg2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer arg0) {
System.out.println("Zipped Result: " + arg0);
}
});
public static Observable<Integer> getNumberedObservable(final int value) {
return Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}
});
}
Great! So that's cool.
So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster? I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?
Zip operator strictly pairs emitted items from observables. It waits for both (or more) items to arrive then merges them. So yes this would be suitable for your needs. I would use Func2 to chain the result from the first two observables.
RxJS implements this operator as zip and zipArray . zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable.
RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.
zip
does run the observables in parallel - but it also subscribes to them serially. Since your getNumberedObservable
is completing in the subscription method it gives the impression of running serially, but there is in fact no such limitation.
You can either try with some long running Observables that outlive their subscription logic, such as timer
, or use the subscribeOn
method to subscribe asynchronously to each stream passed to zip
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With