I am using Retrofit with Observables, and would like to chain the observables. Usually it works well with functions like map() or flatMap(), since the api returns an Observable that does the task. But in this case I have to do the following:
api
Foo and wait for the callback to be called.api.I'd like this to be a single chained call so that I just have to subscribe once. I'm guessing I can use merge() or join() or something, but wasn't sure what the best approach would be to handle the callback.
Is there a way to make this better? This is what I have so far:
api.getKey().subscribe(new Action1<String>() {
@Override
public void call(String key) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
api.sendAwesome(awesome)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
}
});
foo.makeAwesome();
}
});
Adapting clemp6r's solution, here is another one that needs neither Subjects nor nested Subscriptions:
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(final Subscriber<? super String> subscriber) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
if (! subscriber.isUnsubscribed()) {
subscriber.onNext(awesome);
subscriber.onComplete();
}
}
});
foo.makeAwesome();
}
});
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
In general I think it always possibly to wrap any callback based asynchronous operation in an Observable using Observable.create().
You have to use a PublishSubject to transform the callback-based API to an observable.
Try something like that (not tested):
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
Foo foo = new Foo();
PublishSubject<String> subject = PublishSubject.create();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
subject.onNext(awesome);
subject.onComplete();
}
});
foo.makeAwesome();
return subject;
}
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
Api api = new Api() {
@Override Single<String> getKey() {
return Single.just("apiKey");
}
};
api.getKey()
.flatMap(key -> Single.<String>create( singleSubscriber -> {
Foo foo = new Foo();
foo.setAwesomeCallback(awesome -> {
try { singleSubscriber.onSuccess(awesome);}
catch (Exception e) { singleSubscriber.onError(e); }
});
foo.makeAwesome();
}))
.flatMapCompletable(
awesome -> Completable.create(completableSubscriber -> {
try {
sendAwesome(awesome);
completableSubscriber.onCompleted();
} catch (Exception e) { completableSubscriber.onError(e); }
}))
.subscribe(this::handleAwesomeSent, throwable -> { });
See gist for full anonymous class example
This implementation adapts david.mihola answer by making use of Single and Completable types along with the flatMapCompletable(), while being type safe/specific.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With