Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does reactive extensions stop calling the subscriber after an exception is thrown when using Where or OfType operators?

Test1 passes. Why do Test2 and Test3 fail? I'm using .NET 4.0 and Rx 2.0.

[TestClass]
public class RxQuestion
{
   private Subject<string> sequence;
   [TestInitialize] public void Intialize() { sequence = new Subject<string>(); }
   [TestMethod] public void Test1() { Test(sequence); }
   [TestMethod] public void Test2() { Test(sequence.Where(s => true)); }
   [TestMethod] public void Test3() { Test(sequence.OfType<string>()); }
   private void Test(IObservable<string> observable)
   {
      var observed = string.Empty;
      observable.Subscribe(s => { observed = s; if (s == "a") throw new Exception(); });
      try { sequence.OnNext("a"); } catch { }
      sequence.OnNext("b");
      Assert.AreEqual("b", observed);
   }
}
like image 727
Robin Avatar asked Dec 07 '22 08:12

Robin


2 Answers

The real question to me is why does Test1 pass? To me it looks like the Subject<T> type is not playing by the same rules as all the other implementations of IObservable<T>.

On deeper inspection (well reflection really), you can pull apart the Subject<T> type in DotPeek/Reflector and see that when an OnNext(T) call is made, it is delegated directly to its _observer instance. Prior to any subscriptions this is just a NullObject/NopObserver. After a subscription is made (in general) the observer is an Observer<T> implementation. This implementation is really a composite pattern implementation of the IObserver<T> interface, that just calls OnNext(T) for each of its instances.

Furthermore, if we consider that we are using the extension method of Subscribe that just takes an OnNext handler, we now know that our real implmentation of the the IObserver<T> is an AnonymousObserver<T>. Opening this up we see that any call to OnNext(T) is largely unprotected.

Now let us compare this to the IObservable<T> implementations from the Where or Cast operators. Both of these extension methods will return you an IObservable<T> implementation that extends the Producer<T> class. When subscriptions are made to one of these observable sequences, the subscribing observer is wrapped by a SafeObserver<T> implementation. This is the key difference.

If we look into this implementation, we see that for our code path, the anon observer we have will have its MakeSafe method called. This now wraps any call to OnNext with a try/finally.

public void OnNext(T value)
{
    if(this.isStopped!=0)
        return;
    bool flag = true;
    try
    {
        this._onNext(value);
        flag=true;        //Flag only set if OnNext doesn't throw!!
    }
    finally
    {
        if(!flag)
            this._disposable.Dispose();
    }
}

Note that once we have a safe observer as per above, if any OnNext handler throws, then the flag will not get set to true and the _disposable instance is disposed. In this case the _disposable instance represents the subscription.

So there is your explanation as to why the raw Subject passes the test and where the seemingly innocuous operators cause a change in behavior.

As to why Subject<T> does not behave like this by default, I imagine this is due to the performance improvements that were made in the 2.0 release. I get the feeling that subjects are tuned for raw performance and take the assumption that if you are brave enough to be using them, then you know what you are doing (i.e. not throwing in your OnNext handlers!). I base this assumption that they also removed the concurrency safety from subjects by default and you have to turn it on with the Synchronize() extension method, maybe they also thought all of those extra try/finally calls should only be paid for if you opt in. One way of opting in to these safety features is to do what you have done above Where(_=>true) or more commonly AsObservable().

like image 187
Lee Campbell Avatar answered May 13 '23 20:05

Lee Campbell


By definition, an IObservable<T> should not emit any more notifications after either an OnError or an OnCompleted. So when you call sequence.OnNext("b"); you are actually violating the implicit contract an IObservable<T> is supposed to adhere to.

The reason Where and OfType are behaving this way is because they are (appropriately) ignoring any notifications after the OnError you are generating.

like image 28
Timothy Shields Avatar answered May 13 '23 21:05

Timothy Shields