Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx: subscribe with async function and ignore errors

I want to call an async function for each item in an observable. As answered here, the solution is to use SelectMany. However, if the async method throws, the subscription will terminate. I have the following solution, which seems to work:

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .Catch(Observable.Empty<string>()));

Is there a more idiomatic solution?

like image 854
Victor Grigoriu Avatar asked Oct 31 '25 12:10

Victor Grigoriu


1 Answers

There is a standard way to be able to observe the exceptions that occur in your RunAsync call, and that's using .Materialize().

The .Materialize() method turns an IObservable<T> sequence into a IObservable<Notification<T>> sequence where you can reason against the OnNext, OnError, and OnCompleted calls.

I wrote this query:

var obs = Observable.Range(0, 10);

obs
    .SelectMany(x =>
        Observable
            .FromAsync(() => RunAsync())
            .Materialize())
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
    .Subscribe(x => x.Dump());

With this supporting code:

private int counter = 0;
private Random rnd = new Random();

private System.Threading.Tasks.Task<string> RunAsync()
{
    return System.Threading.Tasks.Task.Factory.StartNew(() =>
    {
        System.Threading.Interlocked.Increment(ref counter);
        if (rnd.NextDouble() < 0.3)
        {
            throw new Exception(counter.ToString());
        }
        return counter.ToString();
    });
}

When I run it I get this kind of output:

2
4
5
1!
6
7
3!
10
8!
9

Each of the lines ending in ! are calls to RunAsync that resulted in an exception.

like image 97
Enigmativity Avatar answered Nov 04 '25 05:11

Enigmativity



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!