Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle exceptions thrown by observer's onNext in RxJava?

Consider the following example:

Observable.range(1, 10).subscribe(i -> {     System.out.println(i);      if (i == 5) {         throw new RuntimeException("oops!");     } }, Throwable::printStackTrace); 

This outputs numbers from 1 to 5 and then prints the exception.

What I want to achieve is make the observer stay subscribed and continue to run after throwing an exception, i.e. print all numbers from 1 to 10.

I have tried using retry() and other various error handling operators, but, as said in the documentation, their purpose is handling errors emitted by the observable itself.

The most straightforward solution is just to wrap the whole body of onNext into a try-catch block, but that doesn't sound like a good solution to me. In the similar Rx.NET question, the proposed solution was to make an extension method which would do the wrapping by creating a proxy observable. I tried to remake it:

Observable<Integer> origin = Observable.range(1, 10); Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->         origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));  proxy.subscribe(i -> {     System.out.println(i);      if (i == 5) {         throw new RuntimeException("oops!");     } }, Throwable::printStackTrace); 

This does not change anything, because RxJava itself wraps the subscriber into a SafeSubscriber. Using unsafeSubscribe to get around it doesn't seem to be a good solution either.

What can I do to solve this problem?

like image 299
izstas Avatar asked Aug 16 '14 21:08

izstas


People also ask

How will you handle error in RxJava?

Here, in the above code we see as soon as we get an exception in map operator and then we directly goto onError and the onNext doesn't get called or even onComplete. So, to handle the error in cases like this we use different operators and it will not move to onError directly. Let us understand them one by one.

How do you throw an exception in RxJava?

subscribe( item -> { /* handle success */ }, error -> { /* handle failure */ } ); This works best if you throw the exception as early as possible, as then you can do retries, alternative responses etc. easily. Don't exceptions that extend RuntimeException when thrown crash as they are not handled by RxJava?

What is onErrorResumeNext RxJava?

RxJava implements the Catch operator with three distinct operators: onErrorReturn. instructs an Observable to emit a particular item when it encounters an error, and then terminate normally. onErrorResumeNext. instructs an Observable to begin emitting a second Observable sequence if it encounters an error.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.


1 Answers

This is a common question that arises when learning Rx.

TL;DR

Your suggestion to put your exception handling logic in the subscriber is preferable over creating a generic observable wrapper.

Explanation

Remember, that Rx is about pushing events to subscribers.

From the observable interface, it's clear there's not really anything an observable can know about it's subscribers other than how long they took to handle an event, or the information contained in any thrown exceptions.

A generic wrapper to handle subscriber exceptions and carry on sending events to that subscriber is a bad idea.

Why? Well the observable should only really know that the subscriber is now in an unknown failure state. To carry on sending events in this situation is unwise - perhaps, for example, the subscriber is in a condition where every event from this point forward is going to throw an exception and take a while to do it.

Once a subscriber has thrown an exception, there are only two viable courses of action for the observable:

  • Re-throw the exception
  • Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.

Specific handling of subscriber exceptions would be a poor design choice; it would create inappropriate behavioural coupling between subscriber and observable. So if you want to be resilient to bad subscribers the two choices above are really the sensible limit of responsibility of the observable itself.

If you want your subscriber to be resilient and carry on, then you should absolutely wrap it in exception handling logic designed to handle the specific exceptions you know how to recover from (and perhaps to handle transient exceptions, logging, retry logic, circuit breaking etc.).

Only the subscriber itself will have the context to understand whether it is fit to receive further events in the face of failure.

If your situation warrants developing reusable error handling logic, put yourself in the mindset of wrapping the observer's event handlers rather than the observable - and do take care not to blindly carry on transmitting events in the face of failure. Release It! whilst not written about Rx, is an entertaining software engineering classic has plenty to say on this last point. If you've not read it, I highly advise it.

like image 101
James World Avatar answered Sep 18 '22 06:09

James World