I've got a problem with understanding why the following code doesn't work. Am I doing something wrong or is it some kind of bug in RxJava2 implementation?
private Disposable savedDisposable;
@Test
public void test() {
final TestObserver<Integer> observer = new TestObserver<>();
Observable<Integer> t = Observable.just(10)
.delay(100, TimeUnit.MILLISECONDS)
.doOnSubscribe(disposable -> savedDisposable = disposable);
t.subscribe(observer);
savedDisposable.dispose(); //this doesn't work
//observer.dispose(); //this works
assertTrue(observer.isDisposed());
}
To answer the posted question:
You are disposing in the middle thus the end Disposable
can't know about its upstream has been disposed because dispose()
calls always travel upstream.
There are the DisposableObserver
, ResourceObserver
, subscribeWith
and the lambda-subscribe()
methods that will get you a Disposable
object at the very end which you can dispose via dispose()
.
On the issue list though, it turned out the OP wanted an Observer
and Disposable
to be present on the consumer type and discovered that this can be achieved via constrained generics, for example:
public static <T, K extends Observer<T> & Disposable> K subscribe(
Observable<T> o, K observer) {
o.subscribe(observer);
return observer;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With