I'm somewhat new to Rx.NET. Is it possible to catch an exception which may be thrown by any of the subscribers? Take the following...
handler.FooStream.Subscribe(
_ => throw new Exception("Bar"),
_ => { });
Currently I'm catching on a per subscription basis with an instance of the following. The implementation of which just uses a ManualResetEvent to wake up a waiting thread.
public interface IExceptionCatcher
{
Action<T> Exec<T>(Action<T> action);
}
and using it like so...
handler.FooStream.Subscribe(
_exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
_ => { });
I feel like there must be some better way. Are all of the error handling capabilities in Rx.NET specifically for dealing with errors observables?
EDIT: Per request, my implementation is https://gist.github.com/1409829 (interface and implementation separated into different assemblies in prod code). Feedback is welcome. This may seem silly, but I'm using castle windsor to manage many different Rx subscribers. This exception catcher is registered with the container like this
windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));
It would then be used like this where observable
is instance of IObservable:
var exceptionCatcher =
new ExceptionCatcher(e =>
{
Logger.FatalException(
"Exception caught, shutting down.", e);
// Deal with unmanaged resources here
}, false);
/*
* Normally the code below exists in some class managed by an IoC container.
* 'catcher' would be provided by the container.
*/
observable /* do some filtering, selecting, grouping etc */
.SubscribeWithExceptionCatching(processItems, catcher);
The built-in Observable operators do not do what you are asking for by default (much like events), but you could make an extension method that would do this.
public static IObservable<T> IgnoreObserverExceptions<T, TException>(
this IObservable<T> source
) where TException : Exception
{
return Observable.Create<T>(
o => source.Subscribe(
v => { try { o.OnNext(v); }
catch (TException) { }
},
ex => o.OnError(ex),
() => o.OnCompleted()
));
}
Then any observable could be wrapped by this method to get the behavior you described.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With