Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper usage of Retrofit + RxJava's combineLatest

I want to perform 2 network calls asynchronously - I'm using Retrofit+RxJava to accomplish this. This logic is from a simple Runner class to test out the solution. NOTE: This concerns mostly RxJava on the server-side.

My code looks like the following:

public static void main(String[] args) throws Exception {
  Api api = ...;

  Observable.combineLatest(
      api.getStates(),
      api.getCmsContent(),
      new Func2<List<States>, CmsContent, String>() {
        @Override public String call(List<State> states, CmsContent content) {
          ...
          return "PLACEHOLDER";
        }
      })
      .observeOn(Schedulers.immediate())
      .subscribeOn(Schedulers.immediate())
      .subscribe(new Observer<String>() {
        @Override public void onCompleted() {
          System.out.println("COMPLETED");
        }

        @Override public void onError(Throwable e) {
          System.out.println("ERROR: " + e.getMessage());
        }

        @Override public void onNext(String s) {
          // I don't care what's returned here
        }
      });
}

Three questions:

  1. Is Observable.combineLatest the best operator to use when you want to execute multiple REST calls asynchronously and proceed when all calls have finished?
  2. My Func2 implementation currently returns a String. After the 2 API calls execute, I'll process the results within the Func2#call() method. I don't care what is returned - there must be a better way to handle this, though - am I correct?
  3. The API calls are correctly executed with the code above. But the main method doesn't complete with the proper Process finished with exit code 0 when I run the program. What could be causing the code to hang?

UPDATE - 2015-05-14

Based on the recommendation, I've changed the logic to the following:

public static void main(String[] args) throws Exception {
  Api api = ...;

  Observable.zip(
      api.getStates(),
      api.getCmsContent(),
      new Func2<List<States>, CmsContent, Boolean>() {
        @Override public Boolean call(List<State> states, CmsContent content) {
          // process data
          return true;
        }
      })
      .subscribeOn(Schedulers.io())
      .toBlocking()
      .first();
}

This looks like the solution I was looking for. I'm going to use it for some time to see if I run into any troubles.

like image 514
Kasa Avatar asked May 12 '15 18:05

Kasa


2 Answers

1) If you know you'll have a single value on both paths, it is as good as zip.

2) What would you like to do? You'll get the pair of values in your Func2 and if you don't really care about what travels with onNext, return a value of your chosing.

3) Schedulers.immediate() is not a real scheduler in some sense and is quite prone to same-pool deadlock scenarios. You really don't need to use it. If you want to block the main thread until the async work is finished, use toBlocking().first() for example.

like image 106
akarnokd Avatar answered Sep 21 '22 02:09

akarnokd


1) No the best is to use zip(). Combine latest is good if one of the two (or more) apis are returning "slower" different results/ it has the essence of caching.

2) Fun2 facilitates merging the results. It better (architecture-wise) to process the result either in the onNext() or onError(). You can use a simple Pair<T,Y> class to pass the results from the Func2 to the onNext().

3) Nothing's wrong. The result as said should be processed in the onNext() and not in the onComplete. According to Retrofit's source code the results are passed only (of course corectly) in the onNext().

Hope those help.

like image 22
Diolor Avatar answered Sep 19 '22 02:09

Diolor