Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does RxJava doOnError and onErrorReturn work?

I made these unit tests, and the outcome is not what I expected at all:

// This one outputs "subscribe.onError"  @Test public void observable_doOnError_subscribingToError() throws InterruptedException {     Observable<String> obs = getErrorProducingObservable();     obs.doOnError(throwable -> System.out.println("doOnError"));     obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(         s -> {},         error -> System.out.println("subscribe.onError")     );     Thread.sleep(300); }  // This one outputs "subscribe.onError"  @Test public void observable_onErrorReturn() throws InterruptedException {     Observable<String> obs = getErrorProducingObservable();     obs.onErrorReturn(throwable -> "Yeah I got this");     obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(         s -> System.out.println("got: " + s),         error -> System.out.println("subscribe.onError")     );     Thread.sleep(300); }  private Observable<String> getErrorProducingObservable()  {     return Observable.create(subscriber -> {         subscriber.onError(new RuntimeException("Somebody set up us the bomb"));     }); } 

So both output "subscribe.onError" - neither doOnError nor onErrorReturn seems to be called.

doOnErroris documented as:

Modifies the source Observable so that it invokes an action if it calls onError.

I'm not sure how to intepret that, but I expected either "doOnError" to be output or "doOnError" followed by "subscribe.onError".

onErrorReturn is documented as :

Instructs an Observable to emit an item (returned by a specified function) rather than invoking onError if it encounters an error.

Hence I was expecting "got: Yeah I got this" as output from the latter test.

What gives?

like image 417
Nilzor Avatar asked Sep 02 '15 08:09

Nilzor


People also ask

What happens when an error occurs using RxJava?

RxJava Error Handling That means that after error happened stream is basically finished and no more events can come through it. If Consumer didn't handle error in Observer callback, then that error is sent to a global error handler (which in case of Android crashes the app by default).

What is Completable RxJava?

Single and Completable are new types introduced exclusively at RxJava that represent reduced types of Observable , that have more concise API. Single represent Observable that emit single value or error. Completable represent Observable that emits no value, but only terminal events, either onError or onCompleted.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.

What is single RxJava?

Single. Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response. Sample Implementation: The below code always emits a Single user object.


1 Answers

Both doOnError and onErrorReturn returns a new Observable with the changed behaviour. I agree that the documentation of them may be a little misleading. Modify your tests like this to get the expected behaviour:

// This one outputs "subscribe.onError"  @Test public void observable_doOnError_subscribingToError() throws InterruptedException {     Observable<String> obs =          getErrorProducingObservable()             .doOnError(throwable -> System.out.println("doOnError"));      obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(         s -> {},         error -> System.out.println("subscribe.onError")     );     Thread.sleep(300); }  // This one outputs "subscribe.onError"  @Test public void observable_onErrorReturn() throws InterruptedException {     Observable<String> obs =          getErrorProducingObservable()             .onErrorReturn(throwable -> "Yeah I got this");      obs.subscribeOn(Schedulers.immediate()).observeOn(Schedulers.immediate()).subscribe(         s -> System.out.println("got: " + s),         error -> System.out.println("subscribe.onError")     );     Thread.sleep(300); }  private Observable<String> getErrorProducingObservable()  {     return Observable.create(subscriber -> {         subscriber.onError(new RuntimeException("Somebody set up us the bomb"));     }); } 
like image 136
marstran Avatar answered Oct 16 '22 06:10

marstran