Test1 passes. Why do Test2 and Test3 fail? I'm using .NET 4.0 and Rx 2.0.
[TestClass]
public class RxQuestion
{
private Subject<string> sequence;
[TestInitialize] public void Intialize() { sequence = new Subject<string>(); }
[TestMethod] public void Test1() { Test(sequence); }
[TestMethod] public void Test2() { Test(sequence.Where(s => true)); }
[TestMethod] public void Test3() { Test(sequence.OfType<string>()); }
private void Test(IObservable<string> observable)
{
var observed = string.Empty;
observable.Subscribe(s => { observed = s; if (s == "a") throw new Exception(); });
try { sequence.OnNext("a"); } catch { }
sequence.OnNext("b");
Assert.AreEqual("b", observed);
}
}
The real question to me is why does Test1 pass? To me it looks like the Subject<T>
type is not playing by the same rules as all the other implementations of IObservable<T>
.
On deeper inspection (well reflection really), you can pull apart the Subject<T>
type in DotPeek/Reflector and see that when an OnNext(T)
call is made, it is delegated directly to its _observer
instance. Prior to any subscriptions this is just a NullObject/NopObserver. After a subscription is made (in general) the observer is an Observer<T>
implementation. This implementation is really a composite pattern implementation of the IObserver<T>
interface, that just calls OnNext(T)
for each of its instances.
Furthermore, if we consider that we are using the extension method of Subscribe that just takes an OnNext handler, we now know that our real implmentation of the the IObserver<T>
is an AnonymousObserver<T>
. Opening this up we see that any call to OnNext(T)
is largely unprotected.
Now let us compare this to the IObservable<T>
implementations from the Where
or Cast
operators. Both of these extension methods will return you an IObservable<T>
implementation that extends the Producer<T>
class. When subscriptions are made to one of these observable sequences, the subscribing observer is wrapped by a SafeObserver<T>
implementation. This is the key difference.
If we look into this implementation, we see that for our code path, the anon observer we have will have its MakeSafe method called. This now wraps any call to OnNext with a try/finally.
public void OnNext(T value)
{
if(this.isStopped!=0)
return;
bool flag = true;
try
{
this._onNext(value);
flag=true; //Flag only set if OnNext doesn't throw!!
}
finally
{
if(!flag)
this._disposable.Dispose();
}
}
Note that once we have a safe observer as per above, if any OnNext
handler throws, then the flag
will not get set to true and the _disposable
instance is disposed. In this case the _disposable
instance represents the subscription.
So there is your explanation as to why the raw Subject
passes the test and where the seemingly innocuous operators cause a change in behavior.
As to why Subject<T>
does not behave like this by default, I imagine this is due to the performance improvements that were made in the 2.0 release. I get the feeling that subjects are tuned for raw performance and take the assumption that if you are brave enough to be using them, then you know what you are doing (i.e. not throwing in your OnNext
handlers!). I base this assumption that they also removed the concurrency safety from subjects by default and you have to turn it on with the Synchronize()
extension method, maybe they also thought all of those extra try/finally calls should only be paid for if you opt in. One way of opting in to these safety features is to do what you have done above Where(_=>true)
or more commonly AsObservable()
.
By definition, an IObservable<T>
should not emit any more notifications after either an OnError
or an OnCompleted
. So when you call sequence.OnNext("b");
you are actually violating the implicit contract an IObservable<T>
is supposed to adhere to.
The reason Where
and OfType
are behaving this way is because they are (appropriately) ignoring any notifications after the OnError
you are generating.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With