Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Chaining Retrofit services w/ RxJava support

I am having trouble with chaining observables using retrofit's RxJava support. I'm probably misunderstanding how to use it, otherwise it could be a bug in retrofit. Hopefully someone here can help me understand what's going on. Edit: I am using the MockRestAdapter for these responses - this might be relevant as I see the RxSupport implementations differ slightly.

This is a fake banking app. It's trying to do a transfer, and after the transfer is completed, then it should do a accounts request to update the account values. This is basically just an excuse for me to try out flatMap. The following code unfortunately doesn't work, no subscribers ever get notified:

Case 1: chaining two retrofit-produced observables

The transfer service (note: returns a retrofit-produced observable):

@FormUrlEncoded @POST("/user/transactions/")
public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,
                                             @Field("from_account_number") String fromAccountNumber,
                                             @Field("to_account_number") String toAccountNumber,
                                             @Field("amount") String amount);

The account service (note: returns a retrofit-produced observable):

@FormUrlEncoded @POST("/user/accounts")
public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

Chains two retrofit-produced observables together:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return accountsService.getAccounts(session.getSessionId());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

Case 2: creating my own observable and chaining with a retrofit-produced one

If I ignore the built in Rx support in Retrofit for the "flat mapped" call, it works perfectly! All subscribers get notified. See below:

The new accounts service (note: does not produce an observable):

@FormUrlEncoded @POST("/user/accounts")
public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

Create my own observable and emit the items myself:

transfersService.transfer(session.getSessionId(), fromAccountNumber, toAccountNumber, amount)
            .flatMap(new Func1<TransferResponse, Observable<? extends List<Account>>>() {
                @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
                    return Observable.create(new Observable.OnSubscribe<List<Account>>() {
                        @Override public void call(Subscriber<? super List<Account>> subscriber) {
                            List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
                            subscriber.onNext(accounts);
                            subscriber.onCompleted();
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

Any help would be greatly appreciated!

like image 597
Bradley Campbell Avatar asked Jul 15 '14 01:07

Bradley Campbell


People also ask

What is the difference between RxJava and retrofit?

Rx gives you a very granular control over which threads will be used to perform work in various points within a stream. To point the contrast here already, basic call approach used in Retrofit is only scheduling work on its worker threads and forwarding the result back into the calling thread.

Why we use RxJava with Retrofit?

RxJava is a library that is used for asynchronous and reactive programming in the form of streams. We use different threads in RxJava. A background thread for the network call and the main thread for updating the UI. Schedulers in RxJava is responsible for performing operations using different threads.


1 Answers

The answer is yes you should be able to chain observables from Retrofit. There seems to be a bug in the MockRestAdapter$MockRxSupport:createMockObservable private class. The way the scheduling is done with respect to subscribing the subscriber to the observable seems wrong. Subscribing to the observable comes after in the HttpExecutor thread itself is started. I believe the original flow which comes from your Schedulers.io() thread is completed and unsubscribed before the mockHandler.invokeSync returned Observable can be subscribed to. Hopefully this explanation make some sort of sense if you take a look at the code in the retrofit-mock module.

As a workaround for now with the current code when using the retrofit-mock only you could substitute the internal default Executor with your own ImmediateExecutor implementation. This would allow at least when testing mocks to have a single thread flow which would be provided by your Schedulers.io.

// ImmediateExecutor.java
public class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

// Create your RestAdapter with your ImmdiateExecutor
RestAdapter adapter = new RestAdapter.Builder()
            .setEndpoint(endpoint)
            .setExecutors(new ImmediateExecutor(), null)
            .build();

As for fixing the issue at the source you can also include the retrofit-mock project as source in your project and modify the MockRestAdapter$MockRxSupport:createMockObservable method using the code below. I've tested your use-case and it does fix the problem.

--- MockRestAdapter.java$MockRxSupport ----

Observable createMockObservable(final MockHandler mockHandler, final RestMethodInfo methodInfo,
        final RequestInterceptor interceptor, final Object[] args) {
      return Observable.create(new Observable.OnSubscribe<Object>() {
        @Override public void call(final Subscriber<? super Object> subscriber) {
          try {
            if (subscriber.isUnsubscribed()) return;
            Observable observable =
                (Observable) mockHandler.invokeSync(methodInfo, interceptor, args);

            observable.subscribeOn(Schedulers.from(httpExecutor));

            //noinspection unchecked
            observable.subscribe(subscriber);

          } catch (RetrofitError e) {
            subscriber.onError(errorHandler.handleError(e));
          } catch (Throwable e) {
            subscriber.onError(e);
          }
        }
      });
    }

Created an issue with the Retrofit project here, we'll see if they accept it.

like image 57
Miguel Avatar answered Oct 02 '22 03:10

Miguel