I've been happily making some API calls in a WPF app using RX in the following manner:
IDisposable disposable = _textFromEventPatternStream
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Subscribe(async input =>
{
try
{
IsLoading = true;
int x = int.Parse(input);
var y = await _mathApi.CalcAsync(x);
IsLoading = false;
Model.Update("", y);
}
catch (Exception ex)
{
Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
}
finally
{
IsLoading = false;
}
},
ex => Model.Update(ex.Message, "Error, stream will end..."));
However for various reasons, i think I may need to make the calls using the SelectMany operator and do some processing on the stream.
I expect that within the api calls there may be some errors. For example the API endpoint may not be available. Some of the parsing before the API call fail. Etc. I want the Hot Observable to continue. I need to display a standard IsLoading spinner as well.
Now I also understand, that once on OnError is received the sequence should not continue. I understand this... I just don't like it.
With, that, the question is: Is using Retry() the correct method of achieving a hot observable that continues to operate regardless of errors?
The below rewritten code works, but feels yucky:
IDisposable disposable = _textFromEventPatternStream
.Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Do(_ => IsLoading = true)
.ObserveOn(_rxConcurrencyService.TaskPool)
.SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Do(s => { },
ex =>
{
// this feels like a hack.
Model.Update(ex.Message, "Error, stream will retry...");
IsLoading = false;
})
.Retry()
.Subscribe(x => Model.Update("", x),
ex => Model.Update(ex.Message, "Error, stream will end..."));
I have seen some code examples, where people use nested streams to resubscribe to a faulted stream. From what I've read this seems like a common approach, but to me this it seems to turn what should be a simple scenario into a hard to follow situation.
If it's the CalcAsync that could throw an error, I'd try this instead:
.SelectMany(inputInt => Observable.FromAsync(() => _mathApi.CalcAsync(inputInt)).Retry())
Put the retry as close to the faulting observable as possible.
I'd also suggest some sort of retry count so that a a perpetual error doesn't just hang the observable.
Here's a sample that shows that this works.
This fails:
void Main()
{
var subject = new Subject<string>();
IDisposable disposable =
subject
.Select(input => int.Parse(input))
.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)))
.Subscribe(x => Console.WriteLine(x));
subject.OnNext("1");
subject.OnNext("2");
subject.OnNext("3");
subject.OnNext("4");
subject.OnNext("5");
subject.OnNext("6");
subject.OnCompleted();
}
private int _counter = 0;
public async Task<int> CalcAsync(int x)
{
if (_counter++ == 3)
{
throw new Exception();
}
return await Task.Factory.StartNew(() => -x);
}
It typically outputs:
-1 -2 -3 Exception of type 'System.Exception' was thrown.
Change the SelectMany to:
.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)).Retry())
Now I get:
-1 -3 -2 -4 -5 -6
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With