Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Chaining RxJava observables with callbacks/listeners

I am using Retrofit with Observables, and would like to chain the observables. Usually it works well with functions like map() or flatMap(), since the api returns an Observable that does the task. But in this case I have to do the following:

  1. getKey() from the api
  2. Use the returned key in another library Foo and wait for the callback to be called.
  3. When the callback returns, send the result to the api.

I'd like this to be a single chained call so that I just have to subscribe once. I'm guessing I can use merge() or join() or something, but wasn't sure what the best approach would be to handle the callback.

Is there a way to make this better? This is what I have so far:

api.getKey().subscribe(new Action1<String>() {
   @Override
   public void call(String key) {
      Foo foo = new Foo();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            api.sendAwesome(awesome)
                    .subscribe(new Action1<Void>() {
                       @Override
                       public void call(Void aVoid) {
                           handleAwesomeSent();
                       }
                    });
         }
      });
      foo.makeAwesome();
   }
});
like image 389
hiBrianLee Avatar asked Apr 16 '15 15:04

hiBrianLee


3 Answers

Adapting clemp6r's solution, here is another one that needs neither Subjects nor nested Subscriptions:

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});

In general I think it always possibly to wrap any callback based asynchronous operation in an Observable using Observable.create().

like image 176
david.mihola Avatar answered Oct 08 '22 20:10

david.mihola


You have to use a PublishSubject to transform the callback-based API to an observable.

Try something like that (not tested):

api.getKey().flatMap(new Func1<String, Observable<String>>() {
   @Override
   public Observable<String> call(String key) {
      Foo foo = new Foo();
      PublishSubject<String> subject = PublishSubject.create();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            subject.onNext(awesome);
            subject.onComplete();
         }
      });
      foo.makeAwesome();

      return subject;
   }
}).flatMap(new Func1<String, Observable<String>>() {
   @Override
   public Observable<String> call(String awesome) {
       return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});
like image 20
clemp6r Avatar answered Oct 08 '22 18:10

clemp6r


Api api = new Api() {
  @Override Single<String> getKey() {
    return Single.just("apiKey");
  }
};

api.getKey()
    .flatMap(key -> Single.<String>create( singleSubscriber -> {
        Foo foo = new Foo();
        foo.setAwesomeCallback(awesome -> {
          try { singleSubscriber.onSuccess(awesome);}
          catch (Exception e) { singleSubscriber.onError(e); }
        });
        foo.makeAwesome();
    }))
    .flatMapCompletable(
        awesome -> Completable.create(completableSubscriber -> {
          try {
            sendAwesome(awesome);
            completableSubscriber.onCompleted();
          } catch (Exception e) { completableSubscriber.onError(e); }
        }))
    .subscribe(this::handleAwesomeSent, throwable -> { });

See gist for full anonymous class example

This implementation adapts david.mihola answer by making use of Single and Completable types along with the flatMapCompletable(), while being type safe/specific.

like image 44
ersin-ertan Avatar answered Oct 08 '22 18:10

ersin-ertan