Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Catching exceptions which may be thrown from a Subscription OnNext Action

I'm somewhat new to Rx.NET. Is it possible to catch an exception which may be thrown by any of the subscribers? Take the following...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

Currently I'm catching on a per subscription basis with an instance of the following. The implementation of which just uses a ManualResetEvent to wake up a waiting thread.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

and using it like so...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

I feel like there must be some better way. Are all of the error handling capabilities in Rx.NET specifically for dealing with errors observables?

EDIT: Per request, my implementation is https://gist.github.com/1409829 (interface and implementation separated into different assemblies in prod code). Feedback is welcome. This may seem silly, but I'm using castle windsor to manage many different Rx subscribers. This exception catcher is registered with the container like this

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

It would then be used like this where observable is instance of IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);
like image 653
drstevens Avatar asked Aug 26 '11 20:08

drstevens


1 Answers

The built-in Observable operators do not do what you are asking for by default (much like events), but you could make an extension method that would do this.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.Create<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Then any observable could be wrapped by this method to get the behavior you described.

like image 67
Gideon Engelberth Avatar answered Sep 22 '22 04:09

Gideon Engelberth