Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use DisposableObserver with lambda expressions in RxJava2

My use case in is want to dispose after a certain condition in my onNext. So trying to use DisposableObserver. This is the code that works

Observable.just(1, 2, 3, 4)
    .subscribe(new DisposableObserver<Integer>() {
                     @Override
                     public void onNext(Integer integer) {
                       System.out.println("onNext() received: " + integer);
                       if (integer == 2) {
                         dispose();
                       }
                     }
                     @Override
                     public void onError(Throwable e) { System.out.println("onError()"); }
                     @Override
                     public void onComplete() { System.out.println("onComplete()"); }
                   }
    );

Now if you try to replace this with lambda it treats the lambda as

subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

Doing it this way for now. By saving the disposable from onSubscribe and then calling disposable.dispose(); from onNext.

  private Disposable disposable;
  private void disposableObserverTest() {
    Observable.just(1, 2, 3, 4)
        .subscribe(integer -> {
              System.out.println("onNext() received: " + integer);
              if (integer == 2) {
                disposable.dispose();
              }

            }, throwable -> System.out.println("error"),
            () -> System.out.println("complete"),
            disposable1 -> {
              this.disposable = disposable1;
            });
  }

However, if you want to call dispose() directly how to do it with lambdas?

like image 927
bpr10 Avatar asked Apr 18 '17 11:04

bpr10


2 Answers

you can use takeUntil to close the observable.

@Test
public void takeUntil() throws Exception {
    Observable.just(1, 2, 3, 4)
            .takeUntil(integer -> integer == 2)
            .test()
            .assertValues(1, 2);
}
like image 117
Hans Wurst Avatar answered Oct 05 '22 02:10

Hans Wurst


It's because in the first case you call

subscribe(DisposableObserver observer)

while in the second case you call

subscribe(Action1<? extends Integer> onNext, Action1<? extends Throwable> onError, Action0 onComplete)

That means in the second case you don't hold reference to the DisposableObserver and therefore you can't call dispose() on it.

like image 26
Lamorak Avatar answered Oct 05 '22 02:10

Lamorak