I want to call an async function for each item in an observable. As answered here, the solution is to use SelectMany. However, if the async method throws, the subscription will terminate. I have the following solution, which seems to work:
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.Catch(Observable.Empty<string>()));
Is there a more idiomatic solution?
There is a standard way to be able to observe the exceptions that occur in your RunAsync call, and that's using .Materialize().
The .Materialize() method turns an IObservable<T> sequence into a IObservable<Notification<T>> sequence where you can reason against the OnNext, OnError, and OnCompleted calls.
I wrote this query:
var obs = Observable.Range(0, 10);
obs
.SelectMany(x =>
Observable
.FromAsync(() => RunAsync())
.Materialize())
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
.Subscribe(x => x.Dump());
With this supporting code:
private int counter = 0;
private Random rnd = new Random();
private System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception(counter.ToString());
}
return counter.ToString();
});
}
When I run it I get this kind of output:
2
4
5
1!
6
7
3!
10
8!
9
Each of the lines ending in ! are calls to RunAsync that resulted in an exception.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With