Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing IObservable<T> from scratch

The Reactive Extensions come with a lot of helper methods for turning existing events and asynchronous operations into observables but how would you implement an IObservable<T> from scratch?

IEnumerable has the lovely yield keyword to make it very simple to implement.

What is the proper way of implementing IObservable<T>?

Do I need to worry about thread safety?

I know there is support for getting called back on a specific synchronization context but is this something I as an IObservable<T> author need to worry about or this somehow built-in?

update:

Here's my C# version of Brian's F# solution

using System; using System.Linq; using Microsoft.FSharp.Collections;  namespace Jesperll {     class Observable<T> : IObservable<T>, IDisposable where T : EventArgs     {         private FSharpMap<int, IObserver<T>> subscribers =                   FSharpMap<int, IObserver<T>>.Empty;         private readonly object thisLock = new object();         private int key;         private bool isDisposed;          public void Dispose()         {             Dispose(true);         }          protected virtual void Dispose(bool disposing)         {             if (disposing && !isDisposed)             {                 OnCompleted();                 isDisposed = true;             }         }          protected void OnNext(T value)         {             if (isDisposed)             {                 throw new ObjectDisposedException("Observable<T>");             }              foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))             {                 observer.OnNext(value);             }         }          protected void OnError(Exception exception)         {             if (isDisposed)             {                 throw new ObjectDisposedException("Observable<T>");             }              if (exception == null)             {                 throw new ArgumentNullException("exception");             }              foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))             {                 observer.OnError(exception);             }         }          protected void OnCompleted()         {             if (isDisposed)             {                 throw new ObjectDisposedException("Observable<T>");             }              foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))             {                 observer.OnCompleted();             }         }          public IDisposable Subscribe(IObserver<T> observer)         {             if (observer == null)             {                 throw new ArgumentNullException("observer");             }              lock (thisLock)             {                 int k = key++;                 subscribers = subscribers.Add(k, observer);                 return new AnonymousDisposable(() =>                 {                     lock (thisLock)                     {                         subscribers = subscribers.Remove(k);                     }                 });             }         }     }      class AnonymousDisposable : IDisposable     {         Action dispose;         public AnonymousDisposable(Action dispose)         {             this.dispose = dispose;         }          public void Dispose()         {             dispose();         }     } } 

edit: Don't throw ObjectDisposedException if Dispose is called twice

like image 340
Jesper Larsen-Ledet Avatar asked Nov 20 '09 07:11

Jesper Larsen-Ledet


1 Answers

The official documentation deprecates users implementing IObservable themselves. Instead, users are expected to use the factory method Observable.Create

When possible, implement new operators by composing existing operators. Otherwise implement custom operators using Observable.Create

It happens that Observable.Create is a trivial wrapper around Reactive's internal class AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) {     if (subscribe == null)     {         throw new ArgumentNullException("subscribe");     }     return new AnonymousObservable<TSource>(subscribe); } 

I don't know why they didn't make their implementation public, but hey, whatever.

like image 199
Colonel Panic Avatar answered Oct 11 '22 12:10

Colonel Panic