Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stop rxJava observable chain execution on disposing

While debugging rxJava network call in an app i came across to a situation, that if we dispose or clear disposal object returned by subscription of a chain of observables then only first observable gets disposed not other chained observables by flatMap.

Have a look at following demo code snippet:

CompositeDisposable testCompositeDisposal = new CompositeDisposable();

private void testLoadData() {
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "Second: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    })).doOnNext(value -> {
        Log.w("Debug: ", "doONNext");
    }).doOnDispose(()-> {
        Log.w("Debug: ", "doOnDispose: observable has been disposed");
    }).subscribe();

    testCompositeDisposal.add(disposable);
}

@Override
public void onStop() {
    super.onStop();
    testCompositeDisposal.clear();
}

output:

 W/Debug:: First: 0
 W/Debug:: doOnDispose: observable has been disposed // I dispose Observable chain here.
 W/Debug:: First: 1
 W/Debug:: First: 2
 W/Debug:: First: 3
 W/Debug:: First: 4

As you can just see in above log output that when i dispose given rxJava observable chain only first observable stops emitting items.

I want to stop all observable those are chained.

What is the idiomatic way to solve this issue?

like image 957
chandil03 Avatar asked Sep 04 '17 18:09

chandil03


1 Answers

Two things:

  • flatMap may pre-consume items from upstream (up to 16 on android);
  • Second and more applicable to your use-case, before you call onNext you should check whether the observer is disposed (via .isDisposed()) and abort when that happens.

Also, the second flatMap gets terminated (actually it never gets called). The first one continues.

EDIT

private void testLoadData() { 
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            if(sbr.isDisposed()) return;  // this will cause subscription to terminate.
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> { 
        for (int i = 0; i < 5; i++) { 
            Thread.sleep(3000); 
            Log.w("Debug: ", "Second: " + i); 
            sbr.onNext(true); 
        } 
        sbr.onComplete(); 
    })).doOnNext(value -> { 
        Log.w("Debug: ", "doONNext"); 
    }).doOnDispose(()-> { 
        Log.w("Debug: ", "doOnDispose: observable has been disposed"); 
    }).subscribe(); 

    testCompositeDisposal.add(disposable); 
} 
like image 190
Tassos Bassoukos Avatar answered Sep 22 '22 12:09

Tassos Bassoukos