Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX and Exception Handling

I'm using the reactive extensions for a kind of a in process message bus. The implementation is quite simple.

Register looks like

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(action);
}

And send simply:

private void SendMessage(IMessage message)
    {
        this.subject.OnNext(message);
    }

However i'm now having some trouble with the exception behaviour of RX. One an exception is thrown in a registered/subscribed action - the Observable 'stream' is broken and will not subscribe anymore. Sine this message bus is used for two parts of the application to communicate i need to ensure that such a stream is never broken even if an unexpected exception is thrown.

like image 384
LaurinSt Avatar asked Nov 21 '12 12:11

LaurinSt


1 Answers

If you need to ensure that the stream is never broken by an exception, then you need to have another channel for the exceptions.

This behavior is not completely unexpected. From the documentation for the IObserver<T> interface:

The OnError method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition.

Given this, if the stream is unavailable, corrupted, etc, you definitely want the stream to be "faulted" (this is analagous to a channel being faulted in WCF); the state is indeterminate so you can't rely on anything else that comes from the IObservable<T> implementation; so why should there be an expectation that there will be any more observations?

That said, you have a some options:

Swallow the exception

You'd have to wrap the action delegate that you pass into your Register function, like so:

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch { }
         });
}

This, of course, might not be desirable, as you might be throwing away exceptions which impact your program (or, you might know exactly what's going on here, and want to skip them), but it can be used to build on the next solution.

Provide an action to take when an exception is thrown

Using the above as the base, you can ask for an Action<T, Exception> which will be called when an exception is thrown.

public IDisposable Register<T>(Action<T> action, 
    Action<T, Exception> errorHandler) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch (Exception e) { errorHandler(t, e); }
         });
}

Now, when an exception is thrown from action, it will be passed to your exception handler without breaking the stream.

The above can easily be overloaded to provide the behavior which will swallow the exception (which again, may or may not serve your purposes):

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
    // Call the overload, don't do anything on
    // exception.
    return Register(action, (t, e) => { });
}
like image 51
casperOne Avatar answered Sep 30 '22 13:09

casperOne