Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement an event using Reactive Extensions

The Reactive Extensions allow you to easily subscribe to an event using Observable.FromEventPattern, but I can't find anything on how you might implement an event when you have an IObservable.

My situation is this: I need to implement an interface which contains an event. That event is supposed to be called whenever a certain value of my object changes, and for thread safety reasons I need to call this event on a certain SynchronizationContext. I am also supposed to call each event handler with the current value on registration.

public interface IFooWatcher
{
    event FooChangedHandler FooChanged;
}

Getting an observable that does what I want is rather easy with Rx using BehaviorSubject:

public class FooWatcher
{
    private readonly BehaviorSubject<Foo> m_subject;
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);
        m_observable = m_subject
            .DistinctUntilChanged()
            .ObserveOn(synchronizationContext);
    }

    public event FooChangedHandler FooChanged
    {
        add { /* ??? */ }
        remove { /* ??? */ }
    }
}

Now I am looking for an easy way to have the add and remove functions subscribe and unsubscribe the passed FooChangedHandler as an Observer<Foo> on m_observable. My current implementation looks similar to this:

    add
    {
        lock (m_lock)
        {
            IDisposable disp = m_observable.Subscribe(value);
            m_registeredObservers.Add(
                new KeyValuePair<FooChangedHandler, IDisposable>(
                    value, disp));
        }
    }

    remove
    {
        lock (m_lock)
        {
            KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
                m_registeredObservers
                    .First(pair => object.Equals(pair.Key, value));
            m_registeredObservers.Remove(observerDisposable);
            observerDisposable.Value.Dispose();
        }
    }

However, I hope to find an easier solution, because I need to implement several of these events (of differing handler types). I tried to roll my own generic solution but it creates some additional problems that need to be worked around (in particular, how you generically work with a delegate that takes a parameter of T), so I would prefer to find an existing solution that bridges the gap in this direction - just as FromEventPattern does the reverse.

like image 823
Medo42 Avatar asked Mar 05 '13 18:03

Medo42


People also ask

When should you use Reactive Extensions?

One area where you should definitely consider using Reactive Extensions is as a direct replacement for bog standard C# events, which have been around since C# 1.0. This post will explain how. Exposing an Event. Here is an example of a standard C# event using the standard recommended pattern:

How do I use Reactive Extensions with plane spotted events?

This is how the same thing can be accomplished using Reactive Extensions: The key things to note here are first, the use of the Subscribe method to register for plane spotted events. Second, the subscription to the event is stored in an IDisposable which can later be disposed of, to un-register from the event.

Are there any LINQ methods I can use with Reactive Extensions?

There are a lot more Linq methods you can use but that's beyond the scope of this post and also where you should take a look at this website. Reactive Extensions (Rx) is a pretty large library which does a lot of stuff which overlaps with other libraries like the Task Parallel Library (TPL).

Should you use Reactive Extensions to replace bog standard events?

One area where you should definitely consider using Reactive Extensions is as a direct replacement for bog standard C# events, which have been around since C# 1.0. This post will explain how. Here is an example of a standard C# event using the standard recommended pattern:


1 Answers

You could do this:

public event FooChangedHandler FooChanged
{
    add { m_observable.ToEvent().OnNext += value; }
    remove { m_observable.ToEvent().OnNext -= value; }
}

However, on the remove, I think perhaps you just may want to dispose of the subscription ... or perhaps get the Action from ToEvent() and store that as a member. Untested.

EDIT: You'll have to use Action instead of a FooChangedHandler delegate, however.

EDIT 2: Here's a tested version. I suppose you need to use FooChangedHandler, however, since you have a bunch of these pre-existing handlers?

void Main()
{
    IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
    var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
    watcher.FooChanged += o => o.X.Dump();  
    foos.Subscribe(watcher.Subject.OnNext); 
}

// Define other methods and classes here

//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
    event Action<Foo> FooChanged;
}

public class Foo {
    public int X { get; set; }
}
public class FooWatcher
{

    private readonly BehaviorSubject<Foo> m_subject;
    public BehaviorSubject<Foo> Subject { get { return m_subject; } }
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);

        m_observable = m_subject
            .DistinctUntilChanged();
    }

    public event Action<Foo> FooChanged
    {
        add { m_observable.ToEvent().OnNext += value; }
        remove { m_observable.ToEvent().OnNext -= value; }
    }
}
like image 140
Richard Anthony Freeman-Hein Avatar answered Sep 27 '22 19:09

Richard Anthony Freeman-Hein