I am using a simple Subject<object>
to implement the Event Aggregator pattern in a web application like so:
public class EventAggregator
{
private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>());
public IObservable<T> GetEvent<T>()
{
return _subject.AsObservable().OfType<T>();
}
public void Publish<TEvent>(TEvent sampleEvent)
{
_subject.OnNext(sampleEvent);
}
}
When an operator or subscriber throws an exception, I want to log it then ignore it and keep publishing events - essentially I want the stream to "keep going" in the case of some unexpected behaviour, as the Event Aggregator has a singleton lifetime in the context of the application.
In this question the answer given is to create a deferred observable and use Retry()
, but I do not want a cold observable here.
The solution I came up with is to use Catch
and a try-catch in the subscriber method, which I wrapped up into an extension:
public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log)
{
source = source.Catch((Exception e) => { log(e); return source; });
return source.Subscribe(x =>
{
try { subscription(x); }
catch (Exception e) { log(e); }
});
}
I know that "catch-all" exception handling is generally frowned upon, but in this case I am not sure what other options I have, given that I want the subscription to remain even when an exception is thrown. I don't know what types of exceptions might occur, as I do not yet know what work will be done when processing the stream.
Is this an acceptable way to deal with potential exceptions, and can you foresee any issues that might trip me up with this approach?
This is a problematic solution. Once a subscriber has thrown an exception, you must really assume they are dead in the water at that point (there's little else you can sensibly do) and not call them any further. With your approach, a subscriber that has "gone bad" will keep being sent events and could potentially throw exceptions ad infinitum.
Also, you can't really rely on subscribers using your extension method. I would rewrite your GetEvent<T>
method as follows (obviously replace my Console.WriteLine
with some logging). This approach will dispose the subscription due just to the bad subscriber, and keep everyone else running.
public IObservable<T> GetEvent<T>()
{
return Observable.Create<T>(o =>
{
var source = _subject.OfType<T>();
var m = new SingleAssignmentDisposable();
m.Disposable = source.Subscribe(
x => {
try {
o.OnNext(x);
}
catch(Exception e) {
Console.WriteLine(e);
m.Dispose();
}
},
e => {
try {
o.OnError(e);
}
catch(Exception ex) {
Console.WriteLine(ex);
}
finally {
m.Dispose();
}
},
() => {
try {
o.OnCompleted();
}
catch(Exception e) {
Console.WriteLine(e);
}
finally {
m.Dispose();
}
}
);
return m;
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With