Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle exceptions from asynchronous methods within a SelectMany statement

I am trying to process some tasks asynchronously using Rx, e.g.

var list = Enumerable.Range(0, 100)
    .ToObservable()
    .SelectMany(x => Observable.Start(() => {
        Console.WriteLine("Processing {0} ...", x);

        Thread.Sleep(100 * x % 3);

        if (x > 90) {
            Console.WriteLine("Procesing exception {0} > 90", x);
            throw new Exception("Value too large");
        }
        Console.WriteLine("Processing {0} completed.", x);
        return x;
    }))
    .Subscribe(
        x => { Console.WriteLine("Next [{0}]", x); },
        e => {
            Console.WriteLine("Exception:");
            Console.WriteLine(e.Message);
        },
        () => { Console.WriteLine("Complete"); }
    );

The problem I have with this code is that the exception is not passed to the subscriber. So, after a lot of trying I gave up and decided to ask this simple question:

How do you handle the exceptions raised from within asynchronous methods within a SelectMany statement?

Just to make it clear, the final implementation is a synchroneous function call that may or may not throw an exception. The goal is to pass it on to the subscriber so that it can be further processed (in the specific case a message will be shown to the user).

Edit

I moved my findings down to an answer, so that I can mark this question as answered. Personally, I do not agree with self answering ... but sometimes there is no other way, so sorry for it.

like image 430
AxelEckenberger Avatar asked Mar 05 '12 22:03

AxelEckenberger


2 Answers

Use Materialize to convert your OnError / OnCompleted messages into notifications.

For example,

observable.SelectMany(x => Observable.Start(fn).Materialize())

will get you the error / completion wrapped in a notification to be handled in your actual subscription point way down the line, as opposed to the error being terminated inside the SelectMany.

This is useful for most Async call operations because the method either fails or completes.

like image 111
Asti Avatar answered Nov 11 '22 23:11

Asti


The answer

Actually the code is working correctly. However, the debugger breaks at the exceptions as the async operations are still executed in the background - well at least those that were already started when the first exception occurred. Threw me! If you run the code without debugger the exceptions are swallowed.So I guess the problem was really in front of the computer :-)

Still some clarifications on the Observable.Start as I assumed - an this correctly - that the implemtation should have actually some error handling implemented ... see Background.

Background

Observable.Start is a convenience method that uses the Observable.ToAsync method to turn a function/acion into an async operation. If you look at the implementation of the method you'll see that it already does the exception handling/forwarding.

public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
    if (function != null) {
        if (scheduler != null) {
            return () => {
                AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
                scheduler.Schedule(() => {
                    TResult result = default(TResult);
                    try {
                        result = function();
                    } catch (Exception exception1) {
                        Exception exception = exception1;
                        asyncSubject.OnError(exception);
                        return;
                    }
                    asyncSubject.OnNext(result);
                    asyncSubject.OnCompleted();
                });
                return asyncSubject.AsObservable<TResult>();
            };
        } else {
            throw new ArgumentNullException("scheduler");
        }
    } else {
        throw new ArgumentNullException("function");
    }
}
like image 41
AxelEckenberger Avatar answered Nov 11 '22 21:11

AxelEckenberger