The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable<T> from scratch?
IEnumerable has the lovely yield keyword to make it very simple to implement.
What is the proper way of implementing IObservable<T>?
Do I need to worry about thread safety?
I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable<T> author need to worry about or this somehow built-in?
update:
Here's my C# version of Brian's F# solution
using System; using System.Linq; using Microsoft.FSharp.Collections; namespace Jesperll { class Observable<T> : IObservable<T>, IDisposable where T : EventArgs { private FSharpMap<int, IObserver<T>> subscribers = FSharpMap<int, IObserver<T>>.Empty; private readonly object thisLock = new object(); private int key; private bool isDisposed; public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (disposing && !isDisposed) { OnCompleted(); isDisposed = true; } } protected void OnNext(T value) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnNext(value); } } protected void OnError(Exception exception) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } if (exception == null) { throw new ArgumentNullException("exception"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnError(exception); } } protected void OnCompleted() { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnCompleted(); } } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) { throw new ArgumentNullException("observer"); } lock (thisLock) { int k = key++; subscribers = subscribers.Add(k, observer); return new AnonymousDisposable(() => { lock (thisLock) { subscribers = subscribers.Remove(k); } }); } } } class AnonymousDisposable : IDisposable { Action dispose; public AnonymousDisposable(Action dispose) { this.dispose = dispose; } public void Dispose() { dispose(); } } }
edit: Don't throw ObjectDisposedException if Dispose is called twice
The official documentation deprecates users implementing IObservable themselves. Instead, users are expected to use the factory method Observable.Create
When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create
It happens that Observable.Create is a trivial wrapper around Reactive's internal class AnonymousObservable
:
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) { if (subscribe == null) { throw new ArgumentNullException("subscribe"); } return new AnonymousObservable<TSource>(subscribe); }
I don't know why they didn't make their implementation public, but hey, whatever.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With