While debugging rxJava network call in an app i came across to a situation, that if we dispose
or clear
disposal object returned by subscription of a chain of observables
then only first observable
gets disposed not other chained observables
by flatMap
.
Have a look at following demo code snippet:
CompositeDisposable testCompositeDisposal = new CompositeDisposable();
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}
@Override
public void onStop() {
super.onStop();
testCompositeDisposal.clear();
}
output:
W/Debug:: First: 0
W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
W/Debug:: First: 1
W/Debug:: First: 2
W/Debug:: First: 3
W/Debug:: First: 4
As you can just see in above log output that when i dispose given rxJava observable chain only first observable stops emitting items.
I want to stop all observable those are chained.
What is the idiomatic way to solve this issue?
Two things:
flatMap
may pre-consume items from upstream (up to 16 on android);onNext
you should check whether the observer is disposed (via .isDisposed()
) and abort when that happens. Also, the second flatMap
gets terminated (actually it never gets called). The first one continues.
EDIT
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
if(sbr.isDisposed()) return; // this will cause subscription to terminate.
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext");
}).doOnDispose(()-> {
Log.w("Debug: ", "doOnDispose: observable has been disposed");
}).subscribe();
testCompositeDisposal.add(disposable);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With