Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions swallows exceptions from OnNext() called on a thread pool thread?

I use Rx 2 in .Net 4.5. When the following code runs, it just exits silently without executing the OnCompleted delegate or showing any errors. If I use Scheduler.CurrentThread in ToObservable, it will at least throw the error and terminate the program, at which point not executing OnCompleted makes sense. But when this is executed in a thread other than the main one, this behavior seems unreasonable and unacceptable. Do I miss anything?

static void Main()
{
     Enumerable.Range(0, 1)
                           .ToObservable(Scheduler.Default)
                           .Subscribe(o => { throw new Exception("blah"); }, () => Console.WriteLine("completed"));

     Thread.Sleep(2000);
 }

Edited: Yes, when running as a console app, it will always throw the error regardless of what thread the observation is executed on.

However, when I run this code as a test in NUnit as follows, it exits silently after 2 seconds (thread sleep time) without any error or message (expecting "completed"). So is it actually NUnit causing the issue?

[TestFixture]
class Program
{
    [Test]
    public void Test()
    {
        Enumerable.Range(0, 1)
                .ToObservable(Scheduler.Default)
                .Subscribe(
                    o => { throw new Exception("blah"); }, 
                    () => Console.WriteLine("completed"));
        Thread.Sleep(2000);
    }
}
like image 774
Gary Zhang Avatar asked Dec 06 '22 00:12

Gary Zhang


2 Answers

Rx does not catch exceptions thrown by observers. This is a very important design principle that has been discussed in length before, though for some reason it's only included as a footnote of §6.4 in the Rx Design Guidelines.

Note: do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods. These calls are on the edge of the monad. Calling the OnError method from these places will lead to unexpected behavior.

Essentially, this guideline ensures that, from the perspective of an observer, OnError will only be called by exceptions originating from the observable itself, including any calls to user code that participate directly in the computation (rather than merely observing the results). If this were not the case, then an observer may not be able to distinguish whether an exception passed to OnError is a bug in their OnNext handler or perhaps a bug in the observable.

But more importantly, it also ensures that any exception thrown by an OnNext handler goes unhandled. This makes it easier to debug your program and protects user data.

That being said, the reason why you may be observing different behavior when OnNext is executed on a pooled thread is simply a consequence of your debugging experience. Try enabling first-chance exceptions.

Furthermore, I'd also avoid the race condition by changing Thread.Sleep to Console.ReadKey().

like image 122
Dave Sexton Avatar answered Dec 08 '22 12:12

Dave Sexton


Exceptions thrown in the Subscribe block have Undefined Behavior. If you are doing something that can throw, you need to wrap that in a Select or SelectMany (or just wrap the code in a try-catch).

like image 42
Ana Betts Avatar answered Dec 08 '22 12:12

Ana Betts