Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava using zip to chain multiple calls

I'm trying to construct an observable from a chain of API calls but I can't get it to work. What I have is four API calls ApiA, ApiB, ApiC and ApiD that return Observables using RxJavaCallAdapterFactory. ApiA and ApiB have to be called first and then after both are executed ApiC and ApiD should be called. After last two are executed View is initialized. I'm using zip operator for waiting to calls to finish, I'm not sure that's the way to go but I'm quite new to RxJava so if there is another and better way to do that please let me know. Below is my code with comments showing where I got stuck

public Observable syncData() {
    return Observable.zip(
        // these two calls are executed
        callApiA(),
        callApiB(),
        (o, o2) -> Observable.zip(
            /* these two calls are not executed, it seems as if this zip has 
            no subscriber but i don't know why ... */
            callApiC(),
            callApiD(),
            (o, o2) -> {
                someLogic();
                return Observable.empty();
            }));
}

And in view I just subsribe to this method

viewModel.syncData().subscribe(
            o -> mainAdapter.update(),
            throwable -> throwable.printStackTrace()
);

Again I'm fairly new to RxJava so any help will be appreciated. thanks

like image 378
TheJudge Avatar asked Nov 18 '16 11:11

TheJudge


2 Answers

You are on the right track.

However, be aware that the .zip last parameter, Func is not returning Observable<R> from it's call function, but R directly.

So you are returning Observable to the Subscriber's onNext and not it's event.

flatMap in between is needed:

Observable.zip(callApi1(), callApi2(), Pair::new)
     .flatMap((pair) -> Observable.zip(
           Observable.just(pair.first()),
           Observable.just(pair.second()),
           callApiC(),
           callApiD(),
           (t1, t2, t3, t4) -> {
                 someLogic()
           }));

This is pseudocode, however I hope you get the idea:

  1. Zip first 2 calls (A, B)
  2. Return a Pair of objects returned from those functions
  3. Transform Pair using zip operator (using flatMap operator)
  4. Compose this Pair with the next 2 calls (C,D)
  5. Returns composed objects from someLogic() function (remembering, that someLogic should return R and not Observable<R>)

Hint for future: Try writing it without lambda expressions first. Then you can see input and output parameters, so it is easier to write code. Then rewrite the same with lambda expressions.

like image 61
R. Zagórski Avatar answered Oct 21 '22 03:10

R. Zagórski


The important part is that Observables are not executed until they are subscribed to.

So to explain what is happening the first Observable.zip is given a combination function (callApiA(),callApiB()) -> Observable<Something>, so it produces an Observable<Observable<Something>>. The observable is sent as an element and never subscribed to.

To fix it you will need to flatten the observable stream: Observable.zip(...).flatmap(s -> s). This flattens the stream to Observable<Something> and subscribes to the inner observable.

like image 2
Kiskae Avatar answered Oct 21 '22 04:10

Kiskae