Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle exceptions in OnNext when using ObserveOn?

My application terminates when an error is thrown in OnNext by an observer when I use ObserveOn(Scheduler.ThreadPool). The only way I have found to deal with this is by using a custom extension method below (apart from making sure OnNext never throws an exception). And then making sure that each ObserveOn is followed by an ExceptionToError.

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

However, this does not feel right. Is there a better way to deal with this?

Example

This program crashes because of uncaught exception.

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}
like image 909
Herman Avatar asked Jun 25 '12 00:06

Herman


4 Answers

I looked at the native SubscribeSafe method that is supposed to solve this problem, but I can't make it work. This method has a single overload that accepts an IObserver<T>:

// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    IObserver<T> observer);

I tried passing an observer created by the Observer.Create factory method, but the exceptions in the onNext handler continue crashing the process¹, just like they do with the normal Subscribe. So I ended up writing my own version of SubscribeSafe. This one accepts three handlers as arguments, and funnels any exceptions thrown by the onNext and onCompleted handlers to the onError handler.

/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    // Arguments validation omitted
    var disposable = new SingleAssignmentDisposable();
    disposable.Disposable = source.Subscribe(
        value =>
        {
            try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
        }, onError, () =>
        {
            try { onCompleted(); } catch (Exception ex) { onError(ex); }
        }
    );
    return disposable;
}

Beware, an unhandled exception in the onError handler will still crash the process!

¹ Only exceptions thrown when the handler is invoked asynchronously on the ThreadPool.

like image 65
Theodor Zoulias Avatar answered Oct 07 '22 00:10

Theodor Zoulias


We're addressing this issue in Rx v2.0, starting with the RC release. You can read all about it on our blog at http://blogs.msdn.com/rxteam. It basically boils down to more disciplined error handling in the pipeline itself, combined with a SubscribeSafe extension method (to redirect errors during subscription into the OnError channel), and a Catch extension method on IScheduler (to wrap a scheduler with exception handling logic around scheduled actions).

Concerning the ExceptionToError method proposed here, it has one flaw. The IDisposable subscription object can still be null when the callbacks run; there's a fundamental race condition. To work around this, you'd have to use a SingleAssignmentDisposable.

like image 14
Bart De Smet Avatar answered Nov 16 '22 05:11

Bart De Smet


There's a difference between errors in subscription and errors in the observable. A quick test:

var xs = new Subject<int>();

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
             ex => Console.WriteLine("Error in source: " + ex.Message));

Run with this and you'll get a nice handled error in the source:

xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));

Run with this and you'll get an unhandled error in the subscription:

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);

What your solution has done is take errors in the subscription and make them errors in the source. And you've done this on the original stream, rather than on a per subscription basis. You may or may not have intended to do this, but it's almost certainly wrong.

The 'right' way to do it is to add the error handling you need directly to the subscribing action, which is where it belongs. If you don't want to modify your subscription functions directly, you can use a little helper:

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
    return item =>
    {
        try { action(item); }
        catch (System.Exception e) { catchAction(e); }
    };
}

And now to use it, again showing the difference between the different errors:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
                                 ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
             ex => Console.WriteLine("Error in source: " + ex.Message));

Now we can handle (separately) errors in the source and error in the subscription. Of course, any of these actions can be defined in a method, making the above code as simple as (potentially):

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);

Edit

In the comments we then started discussing the fact that errors in the subscription are pointing to errors in the stream itself, and you wouldn't want other subscribers on that stream. This is a completely different type of issue. I would be inclined to write an observable Validate extension to handle this scenario:

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
    return Observable.Create<T>(o => {
        return source.Subscribe(
            x => {
                if (valid(x)) o.OnNext(x);
                else       o.OnError(new Exception("Could not validate: " + x));
            }, e => o.OnError(e), () => o.OnCompleted()
        );
    });
}

Then simple to use, without mixing metaphors (errors only in source):

xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
             ex => Console.WriteLine("Error in source: " + ex.Message));

If you still want suppressed exceptions in Subscribe you should use one of the other discussed methods.

like image 7
yamen Avatar answered Nov 16 '22 06:11

yamen


Your current solution is not ideal. As stated by one of the Rx people here:

Rx operators do not catch exceptions that occur in a call to OnNext, OnError, or OnCompleted. This is because we expect that (1) the observer implementor knows best how to handle those exceptions and we can't do anything reasonable with them and (2) if an exception occurs then we want that to bubble out and not be handled by Rx.

Your current solution gets the IObservable to handle errors thrown by the IObserver, which doesn't make sense as semantically the IObservable should have no knowledge of the things observing it. Consider the following example:

var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
  {
      if (x % 5 == 0)
          throw new Exception();
  },
  ex => Console.WriteLine("There's an argument that this should be called"),
  () => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"),
    ex => Console.WriteLine("But definitely not this"),
    () => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();

Here there is no issue with the source, or the observerThatWorks, but its OnError will be called due to an unrelated error with another Observer. To stop exceptions in a different thread from ending the process, you'll have to catch them in that thread, so put a try/catch block in your observers.

like image 4
Matthew Finlay Avatar answered Nov 16 '22 04:11

Matthew Finlay