I've been happily making some API calls in a WPF app using RX in the following manner:
    IDisposable disposable = _textFromEventPatternStream
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Subscribe(async input =>
            {
                try
                {
                    IsLoading = true;
                    int x = int.Parse(input);
                    var y = await _mathApi.CalcAsync(x);
                    IsLoading = false;
                    Model.Update("", y);
                }
                catch (Exception ex)
                {
                    Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
                }
                finally
                {
                    IsLoading = false;
                }
            },
            ex => Model.Update(ex.Message, "Error, stream will end..."));
However for various reasons, i think I may need to make the calls using the SelectMany operator and do some processing on the stream.
I expect that within the api calls there may be some errors. For example the API endpoint may not be available. Some of the parsing before the API call fail. Etc. I want the Hot Observable to continue. I need to display a standard IsLoading spinner as well.
Now I also understand, that once on OnError is received the sequence should not continue. I understand this... I just don't like it.
With, that, the question is: Is using Retry() the correct method of achieving a hot observable that continues to operate regardless of errors?
The below rewritten code works, but feels yucky:
    IDisposable disposable = _textFromEventPatternStream
        .Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(_ => IsLoading = true)
        .ObserveOn(_rxConcurrencyService.TaskPool)
        .SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(s => { },
            ex =>
            {
                // this feels like a hack.
                Model.Update(ex.Message, "Error, stream will retry...");
                IsLoading = false;
            })
        .Retry()
        .Subscribe(x => Model.Update("", x),
            ex => Model.Update(ex.Message, "Error, stream will end..."));
I have seen some code examples, where people use nested streams to resubscribe to a faulted stream. From what I've read this seems like a common approach, but to me this it seems to turn what should be a simple scenario into a hard to follow situation.
If it's the CalcAsync that could throw an error, I'd try this instead:
.SelectMany(inputInt => Observable.FromAsync(() => _mathApi.CalcAsync(inputInt)).Retry())
Put the retry as close to the faulting observable as possible.
I'd also suggest some sort of retry count so that a a perpetual error doesn't just hang the observable.
Here's a sample that shows that this works.
This fails:
void Main()
{
    var subject = new Subject<string>();
    IDisposable disposable =
        subject
            .Select(input => int.Parse(input))
            .SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)))
            .Subscribe(x => Console.WriteLine(x));
    subject.OnNext("1");
    subject.OnNext("2");
    subject.OnNext("3");
    subject.OnNext("4");
    subject.OnNext("5");
    subject.OnNext("6");
    subject.OnCompleted();
}
private int _counter = 0;
public async Task<int> CalcAsync(int x)
{
    if (_counter++ == 3)
    {
        throw new Exception();
    }
    return await Task.Factory.StartNew(() => -x);
}
It typically outputs:
-1 -2 -3 Exception of type 'System.Exception' was thrown.
Change the SelectMany to:
.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)).Retry())
Now I get:
-1 -3 -2 -4 -5 -6
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With