I am trying to process some tasks asynchronously using Rx, e.g.
var list = Enumerable.Range(0, 100)
.ToObservable()
.SelectMany(x => Observable.Start(() => {
Console.WriteLine("Processing {0} ...", x);
Thread.Sleep(100 * x % 3);
if (x > 90) {
Console.WriteLine("Procesing exception {0} > 90", x);
throw new Exception("Value too large");
}
Console.WriteLine("Processing {0} completed.", x);
return x;
}))
.Subscribe(
x => { Console.WriteLine("Next [{0}]", x); },
e => {
Console.WriteLine("Exception:");
Console.WriteLine(e.Message);
},
() => { Console.WriteLine("Complete"); }
);
The problem I have with this code is that the exception is not passed to the subscriber. So, after a lot of trying I gave up and decided to ask this simple question:
How do you handle the exceptions raised from within asynchronous methods within a SelectMany
statement?
Just to make it clear, the final implementation is a synchroneous function call that may or may not throw an exception. The goal is to pass it on to the subscriber so that it can be further processed (in the specific case a message will be shown to the user).
Edit
I moved my findings down to an answer, so that I can mark this question as answered. Personally, I do not agree with self answering ... but sometimes there is no other way, so sorry for it.
Use Materialize
to convert your OnError / OnCompleted messages into notifications.
For example,
observable.SelectMany(x => Observable.Start(fn).Materialize())
will get you the error / completion wrapped in a notification to be handled in your actual subscription point way down the line, as opposed to the error being terminated inside the SelectMany.
This is useful for most Async call operations because the method either fails or completes.
The answer
Actually the code is working correctly. However, the debugger breaks at the exceptions as the async operations are still executed in the background - well at least those that were already started when the first exception occurred. Threw me! If you run the code without debugger the exceptions are swallowed.So I guess the problem was really in front of the computer :-)
Still some clarifications on the Observable.Start
as I assumed - an this correctly - that the implemtation should have actually some error handling implemented ... see Background.
Background
Observable.Start
is a convenience method that uses the Observable.ToAsync
method to turn a function/acion into an async operation. If you look at the implementation of the method you'll see that it already does the exception handling/forwarding.
public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
if (function != null) {
if (scheduler != null) {
return () => {
AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
scheduler.Schedule(() => {
TResult result = default(TResult);
try {
result = function();
} catch (Exception exception1) {
Exception exception = exception1;
asyncSubject.OnError(exception);
return;
}
asyncSubject.OnNext(result);
asyncSubject.OnCompleted();
});
return asyncSubject.AsObservable<TResult>();
};
} else {
throw new ArgumentNullException("scheduler");
}
} else {
throw new ArgumentNullException("function");
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With