Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX: Run Zipped Observables in parallel?

So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.

So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"

It took me an hour or 2, but I eventually found the Zip Functionality and it helps me out handily:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Great! So that's cool.

So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster? I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?

like image 832
spierce7 Avatar asked Jan 18 '14 06:01

spierce7


People also ask

How Observable zip works?

Zip operator strictly pairs emitted items from observables. It waits for both (or more) items to arrive then merges them. So yes this would be suitable for your needs. I would use Func2 to chain the result from the first two observables.

What does zip do RxJS?

RxJS implements this operator as zip and zipArray . zip accepts a variable number of Observables or Promises as parameters, followed by a function that accepts one item emitted by each of those Observables or resolved by those Promises as input and produces a single item to be emitted by the resulting Observable.

What is Reactivex RxJava?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.


1 Answers

zip does run the observables in parallel - but it also subscribes to them serially. Since your getNumberedObservable is completing in the subscription method it gives the impression of running serially, but there is in fact no such limitation.

You can either try with some long running Observables that outlive their subscription logic, such as timer, or use the subscribeOn method to subscribe asynchronously to each stream passed to zip.

like image 163
James World Avatar answered Sep 23 '22 19:09

James World