Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why shouldn't I implement IObservable<T>?

Reading msdn about the Reactive Extensions and such, I've found a recommendation saying I shouldn't implement IObservable, rather use Observable.Create... By the time I've read this, my project already had an ObservableImplementation<T> class, which I'd used as an IObservable source, everywhere I wanted to transform events into Observables.

I've read the AbstractObservable<T> implementation in System.Reactive, and I haven't found any major difference between their code and mine. So what's wrong with implementing IObservable? I can add my own properties to it, and so on...

for fullness sake, here is my implementation, please tell me if I did anything wrong!

public sealed class ObservableImplementation<T> : IObservable<T>
{
    class Subscription : IDisposable
    {
        private readonly Action _onDispose;
        public Subscription(Action onDispose)
        {
            _onDispose = onDispose;
        }

        public void Dispose()
        {
            _onDispose();
        }
    }


    public void Raise(T value)
    {
        _observers.ForEach(o => o.OnNext(value));
    }
    public void Completion()
    {
        _observers.ForEach(o => o.OnCompleted());
        _observers.Clear();
    }

    private readonly List<IObserver<T>> _observers = new List<IObserver<T>>();  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        var subscription = new Subscription(() => _observers.Remove(observer));
        _observers.Add(observer);
        return subscription;
    }
    public bool AnyObserverPresent { get { return _observers.Any(); } }
}
like image 371
TDaver Avatar asked May 07 '12 10:05

TDaver


3 Answers

A recent blog post from the Rx team contains three reasons. Because it is a lengthy post I have copied the relevant parts.

Enforce the contract

Observable.Create takes in a single delegate that will become the core implementation of the Subscribe method on the resulting IObservable implementation. We do some clever wrapping around this delegate to enforce the observer contract, amongst other things (which is why you shouldn’t implement the interface yourself).

Wrapper for disposable

The returned disposable has a little wrapper around it, used to ensure the observer won’t be called anymore after returning from the Dispose call, even though the scheduler may not be at a good stopping point yet. (Yet another reason you should never implement the IObservable interface by hand. Oh, and by the way, there’s even more!)

Auto dispose on completion

The point of interest here is the auto-dispose behavior that’s applied to the source subscription upon sending the OnCompleted downstream. (This is yet another reason why manual implementation of IObservable is strongly discouraged. When using Observable.Create, we take care of this for you.)

like image 84
Herman Avatar answered Sep 21 '22 05:09

Herman


There are a few reasons why we don't recommend people to implement IObservable<T> directly.

One is the lack of protection against violations of the observer grammar. For example, your sequence can exhibit behavior of OnNext calls following an OnCompleted call, which is invalid. The Observable.Create<T> method and ObservableBase<T> base type take care of this, by automatically detaching the observer upon receiving a terminal message. So even if your code does the wrong thing, the observer is not seeing a malformed sequence.

Btw, this is similar to iterators in C#. Implementing IEnumerable<T> manually should be such that when an enumerator's MoveNext returns false (analogous to OnCompleted), subsequent calls don't change their mind and start returning true (analogous to OnNext):

If MoveNext passes the end of the collection, the enumerator is positioned after the last element in the collection and MoveNext returns false. When the enumerator is at this position, subsequent calls to MoveNext also return false until Reset is called. (Source: MSDN)

When using iterators in C# 2.0 or VB 11.0, such concerns are taken care of for you. This is similar to our Observable.Create<T> method and ObservableBase<T> base type.

A reason related to the discussion above is clean-up. Upon returning from a Dispose call on a subscription, will the observer no longer see any messages? Upon sending a terminal message into the observer, will the Dispose logic for the related subscription be called automatically? Both are non-trivial to get right, so our base implementation takes care of that.

Another reason has to do with our CurrentThreadScheduler, ensuring that Subscribe calls can be asynchronous when running on that scheduler. Essentially, we need to check whether we need to install a trampoline on the current thread during a call to Subscribe. We don't expect everyone to know about this and do the right thing.

In your particular case - as noticed by others here - you're building pretty much a subject. Either just use one of our subjects, or wrap it by containment in your own type (e.g. if you want the sending "observer" side to be accessible by other parties than the receiving "observable" side).

like image 23
Bart De Smet Avatar answered Nov 09 '22 06:11

Bart De Smet


The reason you shouldn't implement IObservable<T> is the same reason you don't usually implement IEnumerable<T>, is that somebody has most likely already built the thing you want. In this case, you've basically reimplemented Subject<T> for the most part.

Edit: As to the Lazy question in the comments, I'd implement that this way:

var lazyObservable = Observable.Create<TFoo>(subj => { /*TODO: Implement Me to load from reflection*/ })
    .Multicast(new Subject<TFoo>())   // This means it'll only calc once
    .RefCount();    // This means it won't get created until someone Subscribes
like image 10
Ana Betts Avatar answered Nov 09 '22 06:11

Ana Betts