Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Swallowing exceptions from a long-running event aggregator observable

I am using a simple Subject<object> to implement the Event Aggregator pattern in a web application like so:

public class EventAggregator
{
    private readonly ISubject<object, object> _subject = Subject.Synchronize(new Subject<object>());
    public IObservable<T> GetEvent<T>()
    {
        return _subject.AsObservable().OfType<T>();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        _subject.OnNext(sampleEvent);
    }
}

When an operator or subscriber throws an exception, I want to log it then ignore it and keep publishing events - essentially I want the stream to "keep going" in the case of some unexpected behaviour, as the Event Aggregator has a singleton lifetime in the context of the application.

In this question the answer given is to create a deferred observable and use Retry(), but I do not want a cold observable here.

The solution I came up with is to use Catch and a try-catch in the subscriber method, which I wrapped up into an extension:

public static IDisposable SubscribeSwallowExceptions<T>(this IObservable<T> source, Action<T> subscription, Action<Exception> log)
{
    source = source.Catch((Exception e) => { log(e); return source; });

    return source.Subscribe(x =>
    {
        try { subscription(x); }
        catch (Exception e) { log(e); }
    });
}

I know that "catch-all" exception handling is generally frowned upon, but in this case I am not sure what other options I have, given that I want the subscription to remain even when an exception is thrown. I don't know what types of exceptions might occur, as I do not yet know what work will be done when processing the stream.

Is this an acceptable way to deal with potential exceptions, and can you foresee any issues that might trip me up with this approach?

like image 224
Alex Avatar asked Mar 17 '14 11:03

Alex


1 Answers

This is a problematic solution. Once a subscriber has thrown an exception, you must really assume they are dead in the water at that point (there's little else you can sensibly do) and not call them any further. With your approach, a subscriber that has "gone bad" will keep being sent events and could potentially throw exceptions ad infinitum.

Also, you can't really rely on subscribers using your extension method. I would rewrite your GetEvent<T> method as follows (obviously replace my Console.WriteLine with some logging). This approach will dispose the subscription due just to the bad subscriber, and keep everyone else running.

public IObservable<T> GetEvent<T>()
{       
    return Observable.Create<T>(o =>
    {
        var source = _subject.OfType<T>();
        var m = new SingleAssignmentDisposable();
        m.Disposable = source.Subscribe(
            x => {
                try {
                    o.OnNext(x);                        
                }
                catch(Exception e) {
                    Console.WriteLine(e);
                    m.Dispose();
                }
            },
            e => {
                try {
                    o.OnError(e);                       
                }
                catch(Exception ex) {
                    Console.WriteLine(ex);
                }
                finally {
                    m.Dispose();
                }
            },
            () => {
                try {
                    o.OnCompleted();                        
                }
                catch(Exception e) {
                    Console.WriteLine(e);
                }
                finally {
                    m.Dispose();
                }
            }               
        );

        return m;
    });
}
like image 62
James World Avatar answered Oct 28 '22 13:10

James World