Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava2 dispose() doesn't work if called on the object returned by doOnSubscribe()

I've got a problem with understanding why the following code doesn't work. Am I doing something wrong or is it some kind of bug in RxJava2 implementation?

private Disposable savedDisposable;

@Test
public void test() {
    final TestObserver<Integer> observer = new TestObserver<>();

    Observable<Integer> t = Observable.just(10)
            .delay(100, TimeUnit.MILLISECONDS)
            .doOnSubscribe(disposable -> savedDisposable = disposable);

    t.subscribe(observer);

    savedDisposable.dispose();  //this doesn't work
    //observer.dispose();       //this works  

    assertTrue(observer.isDisposed());
}
like image 416
kmalmur Avatar asked Apr 20 '17 07:04

kmalmur


1 Answers

To answer the posted question:

You are disposing in the middle thus the end Disposable can't know about its upstream has been disposed because dispose() calls always travel upstream.

There are the DisposableObserver, ResourceObserver, subscribeWith and the lambda-subscribe() methods that will get you a Disposable object at the very end which you can dispose via dispose().


On the issue list though, it turned out the OP wanted an Observer and Disposable to be present on the consumer type and discovered that this can be achieved via constrained generics, for example:

public static <T, K extends Observer<T> & Disposable> K subscribe(
        Observable<T> o, K observer) {
    o.subscribe(observer);
    return observer;
}
like image 97
akarnokd Avatar answered Nov 13 '22 20:11

akarnokd