Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait multiple nested async calls by using of RxJava-Android?

I'm new to RxJava, here's my case,

  1. send request A and will get List<A> back
  2. for each A, send request AA and will get AA back, bind A and AA then
  3. there is B & BB with similar logic
  4. do something only after all requests complete

Example:

request(url1, callback(List<A> listA) {
    for (A a : listA) {
        request(url2, callback(AA aa) {
            a.set(aa);
        }
    }
}

A and B are independent

How to structure the code? I also used Retrofit as network client.

like image 289
fifth Avatar asked Oct 14 '14 09:10

fifth


1 Answers

OK, I think this should solve the first part of your problem:

Notice that the second call to flatMap is given 2 arguments - there is a version of flatMap that not only produces an Observable for each input item but that also take a second function which in turn will combine each item from the resulting Observable with the corresponding input item.

Have a look at the third graphic under this heading to get an intuitive understanding:

https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#flatmap-concatmap-and-flatmapiterable

Observable<A> obeservableOfAs = retrofitClient.getListOfAs()
.flatMap(new Func1<List<A>, Observable<A>>() {

    @Override
    public Observable<A> call(List<A> listOfAs) {
        return Observable.from(listOfAs);
    }

)}
.flatMap(new Func1<A, Observable<AA>>() {

    @Override
    public Observable<AA> call(A someA) {
        return retrofitClient.getTheAaForMyA(someA);
    }

},
new Func2<A, AA, A>() {

    @Override
    public A call(A someA, AA theAaforMyA) {
        return someA.set(theAaforMyA);
    }

})
...

From here on I am still not sure how you want to continue: Are you ready to just subscribe to the resulting Observable of As? That way you could handle each of the As (onNext) or just wait until all are done (onCompleted).

ADDENDUM: To collect all Items into a single List at the end, that is turn your Observable<A> into an Observable<List<A>> use toList().

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#tolist

So you have:

Observable<List<A>> observableOfListOfAs = observableOfAs.toList();

If you need more fine grained control over the construction of your list, you can also use reduce.

https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#reduce

For the Bs, simply duplicate the whole flow you used for the As.

You can then use zip to wait for both flows to complete:

Observable.zip(
    observableOfListOfAs,
    observableOfListOfBs,
    new Func2<List<A>, List<B>, MyPairOfLists>() {

        @Override
        public MyPairOfLists call(List<A> as, List<B> bs) {
            return new MyPairOfLists(as, bs);
        }
    }
)
.subscribe(new Subscriber<MyPairOfLists>() {

    // onError() and onCompleted() are omitted here

    @Override
    public void onNext(MyPairOfLists pair) {
        // now both the as and the bs are ready to use:

        List<A> as = pair.getAs();
        List<B> bs = pair.getBs();

        // do something here!
    }
});

I suppose you can guess the definition of MyPairOfLists.

like image 74
david.mihola Avatar answered Nov 18 '22 17:11

david.mihola