Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine multiple event sources into one IObservable with Rx

This is a question about how to use Reactive Extensions (Rx) in a specific event-related scenario.

  • The aim is to take a number of classes that trigger some event
  • And congregate them into one IObservable that can be subscribed to by any clients (unaware of the event classes).
  • Note the events of interest use subclassed EventArgs

Some Custom EventArgs

public class HappenedEventArgs : EventArgs
{
    internal bool IsBadNotGood;
}

Many Separate Classes Where Happened Events Occur

public class EventSourceA : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class calls OnHappened(e) whenever it decides to ...
}

public class EventSourceB : IEventSource {

    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class also calls OnHappened(e) at times ...
}

public interface IEventSource
{
    event HappenedEventHandler Happened;
}

public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);

How to Congregate All Those Events and Expose a United Event Front

public class Pooler{

    private IObservable<X> _pool;

    public IObservable<X> Subscribe(){
        return _pool;        
    }

    public void Register(IEventSource item)
    {
        // How to take item.Happened and inject/bind it into _pool here?
    }        

    internal void Unregister(IEventSource item)
    {
        // Disconnect item.Happened from _pool
    }

    public Pooler(){
        // Instantiate _pool to whatever is best?
        // _pool = ...
    }

 }

A Subscriber Who Doesnt Know Anything About EventSources Directly

 static void Try() {
     var pooler = new Pooler();
     pooler.Subscribe().Subscribe(e =>
            {
                 // Do something with events here, as they arrive
            }
     );
     // ....
     // Wherever whenever:
     AddEventSources(pooler);
 }

 static void AddEventSources(Pooler pooler){
     var eventSourceA = new EventSourceA();
     pooler.Register(eventSourceA);
     var eventSourceB = new EventSourceB();
     pooler.Register(eventSourceB);     
 }
like image 715
Cel Avatar asked Nov 19 '11 13:11

Cel


2 Answers

What the Rx library tries to provide are ways of handling situations such as these without having to create a bunch of classes/methods that manually propagate observables.

Let's say you had a class with an event:

public class EventedClass
{
    public event Action<EventArgs> Event;
}

And an enumerable of those instances IEnumerable<EventedClass> objects, you can use LINQ to project observables out of those classes, combine them with Observable.Merge which will give you the combined sequential output of those events.

Observable.Merge(
    objects.Select(
        o => Observable.FromEvent<EventArgs>(
            handler => o.Event += handler,
            handler => o.Event -= handler
        )
)).Subscribe(args => 
{ 
    //do stuff
});
like image 111
Asti Avatar answered Nov 05 '22 21:11

Asti


It sounds like you are doing something similar to this question. Basically, you want to use a subject as your _pool variable and have it subscribe and unsubscribe to the different event sources in Register and Unregister. To unregister a source, you will need to keep the disposables you got in the call to Register. Also, I would consider making Pooler implement IObservable directly and just forward Subscribe to the _pool variable.

using System.Reactive.Subjects;
using System.Reactive.Linq;

public class Pooler 
    : IObservable<HappenedEventArgs>, 
      IDisposable
{

    void Dispose()
    {
        if (_pool != null) _pool.Dispose();
        if (_sourceSubs != null)
        {
            foreach (var d in _sourceSubs.Values)
            {
                d.Dispose();
            }
            _sourceSubs.Clear();
        }
    }

    private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>();
    private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>();

    public IDisposable Subscribe(IObserver<HappenedEventArgs> observer)
    {
        return _pool.Subscribe(observer);
    }

    public void Register(IEventSource item)
    {
        if (_sourceSubs.ContainsKey(item))
        {
            return; //already registered
        }
        else
        {
            _sourceSubs.Add(item,
                            Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h,
                                                        h => item.Happened -= h)
                                      .Select(ep => ep.EventArgs)
                                      .Subscribe(_pool));
        }
    }

    internal void Unregister(IEventSource item)
    {
        IDisposable disp = null;
        if (_sourceSubs.TryGetValue(item, out disp))
        {
            _sourceSubs.Remove(item);
            disp.Dispose();
        }
    }
}

Note that you will need to implement IDisposable to ensure that you can clean up all the event subscriptions when you are done with the Pooler.

like image 4
Gideon Engelberth Avatar answered Nov 05 '22 21:11

Gideon Engelberth