I am using Retrofit with Observables, and would like to chain the observables. Usually it works well with functions like map()
or flatMap()
, since the api
returns an Observable that does the task. But in this case I have to do the following:
api
Foo
and wait for the callback to be called.api
.I'd like this to be a single chained call so that I just have to subscribe once. I'm guessing I can use merge()
or join()
or something, but wasn't sure what the best approach would be to handle the callback.
Is there a way to make this better? This is what I have so far:
api.getKey().subscribe(new Action1<String>() {
@Override
public void call(String key) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
api.sendAwesome(awesome)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
}
});
foo.makeAwesome();
}
});
Adapting clemp6r's solution, here is another one that needs neither Subjects
nor nested Subscriptions
:
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
return Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(final Subscriber<? super String> subscriber) {
Foo foo = new Foo();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
if (! subscriber.isUnsubscribed()) {
subscriber.onNext(awesome);
subscriber.onComplete();
}
}
});
foo.makeAwesome();
}
});
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
In general I think it always possibly to wrap any callback based asynchronous operation in an Observable
using Observable.create()
.
You have to use a PublishSubject to transform the callback-based API to an observable.
Try something like that (not tested):
api.getKey().flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String key) {
Foo foo = new Foo();
PublishSubject<String> subject = PublishSubject.create();
foo.setAwesomeCallback(new AwesomeCallback() {
@Override
public void onAwesomeReady(String awesome) {
subject.onNext(awesome);
subject.onComplete();
}
});
foo.makeAwesome();
return subject;
}
}).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String awesome) {
return sendAwesome(awesome);
}
}).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
handleAwesomeSent();
}
});
Api api = new Api() {
@Override Single<String> getKey() {
return Single.just("apiKey");
}
};
api.getKey()
.flatMap(key -> Single.<String>create( singleSubscriber -> {
Foo foo = new Foo();
foo.setAwesomeCallback(awesome -> {
try { singleSubscriber.onSuccess(awesome);}
catch (Exception e) { singleSubscriber.onError(e); }
});
foo.makeAwesome();
}))
.flatMapCompletable(
awesome -> Completable.create(completableSubscriber -> {
try {
sendAwesome(awesome);
completableSubscriber.onCompleted();
} catch (Exception e) { completableSubscriber.onError(e); }
}))
.subscribe(this::handleAwesomeSent, throwable -> { });
See gist for full anonymous class example
This implementation adapts david.mihola answer by making use of Single
and Completable
types along with the flatMapCompletable()
, while being type safe/specific.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With