My use case in is want to dispose after a certain condition in my onNext. So trying to use DisposableObserver. This is the code that works
Observable.just(1, 2, 3, 4)
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer integer) {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
dispose();
}
}
@Override
public void onError(Throwable e) { System.out.println("onError()"); }
@Override
public void onComplete() { System.out.println("onComplete()"); }
}
);
Now if you try to replace this with lambda it treats the lambda as
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
Doing it this way for now. By saving the disposable from onSubscribe and then calling disposable.dispose(); from onNext.
private Disposable disposable;
private void disposableObserverTest() {
Observable.just(1, 2, 3, 4)
.subscribe(integer -> {
System.out.println("onNext() received: " + integer);
if (integer == 2) {
disposable.dispose();
}
}, throwable -> System.out.println("error"),
() -> System.out.println("complete"),
disposable1 -> {
this.disposable = disposable1;
});
}
However, if you want to call dispose() directly how to do it with lambdas?
you can use takeUntil to close the observable.
@Test
public void takeUntil() throws Exception {
Observable.just(1, 2, 3, 4)
.takeUntil(integer -> integer == 2)
.test()
.assertValues(1, 2);
}
It's because in the first case you call
subscribe(DisposableObserver observer)
while in the second case you call
subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete)
That means in the second case you don't hold reference to the DisposableObserver
and therefore you can't call dispose()
on it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With