Given the following:
Why is the OnError
handler in Subscribe
never called?
var observable = Observable.Create<string>(
async (o, c) =>
{
try
{
var strings = new[] { "A", "B", "C" };
foreach (var s in strings)
{
await Task.Delay(100);
if (c.IsCancellationRequested)
{
// exception thrown here.
Console.WriteLine("cancelled");
throw new OperationCancelledException();
}
o.OnNext(s);
}
o.OnCompleted();
}
catch (Exception ex)
{
// caught here...
o.OnError(ex);
}
});
var tcs = new TaskCompletionSource<bool>();
var token = new CancellationTokenSource();
observable.Subscribe(
str =>
{
Console.WriteLine(str);
token.Cancel(); // cancel after the first iteration.
},
(e) =>
{
// why is this never called.
Console.WriteLine($"on error :: {e.Message}");
tcs.SetResult(true);
},
() =>
{
Console.WriteLine("on complete");
tcs.SetResult(true);
},
token.Token);
// code hangs here because the subscription never completes?
await tcs.Task;
Console.WriteLine("done");
When you call Cancel
on the token, you (the subscriber, who passed the token and therefore "own" cancellation) are basically saying "I am no longer interested in events, including OnError()".
Under the covers, Rx inserts an AutoDetachObserver
in between the observable and the observer which explicitly swallows all further events.
This behaviour is by design.
OnError()
is there to tell you about failures while you are subscribed. After cancelling the token (which unsubscribes the observer), you aren't subscribed any more and receive no more events. In other words, cancelling a subscription is not an error. And cancelling the token in the observable is not an error or a valid means of communicating one - calling OnError()
is.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With