This is a question about how to use Reactive Extensions (Rx) in a specific event-related scenario.
IObservable that can be subscribed to by any clients (unaware of the event classes).EventArgs
public class HappenedEventArgs : EventArgs
{
internal bool IsBadNotGood;
}
public class EventSourceA : IEventSource {
public event HappenedEventHandler Happened;
private void OnHappened(HappenedEventArgs e)
{
if (Happened != null)
Happened(this, e);
}
// And then this class calls OnHappened(e) whenever it decides to ...
}
public class EventSourceB : IEventSource {
public event HappenedEventHandler Happened;
private void OnHappened(HappenedEventArgs e)
{
if (Happened != null)
Happened(this, e);
}
// And then this class also calls OnHappened(e) at times ...
}
public interface IEventSource
{
event HappenedEventHandler Happened;
}
public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);
public class Pooler{
private IObservable<X> _pool;
public IObservable<X> Subscribe(){
return _pool;
}
public void Register(IEventSource item)
{
// How to take item.Happened and inject/bind it into _pool here?
}
internal void Unregister(IEventSource item)
{
// Disconnect item.Happened from _pool
}
public Pooler(){
// Instantiate _pool to whatever is best?
// _pool = ...
}
}
static void Try() {
var pooler = new Pooler();
pooler.Subscribe().Subscribe(e =>
{
// Do something with events here, as they arrive
}
);
// ....
// Wherever whenever:
AddEventSources(pooler);
}
static void AddEventSources(Pooler pooler){
var eventSourceA = new EventSourceA();
pooler.Register(eventSourceA);
var eventSourceB = new EventSourceB();
pooler.Register(eventSourceB);
}
What the Rx library tries to provide are ways of handling situations such as these without having to create a bunch of classes/methods that manually propagate observables.
Let's say you had a class with an event:
public class EventedClass
{
public event Action<EventArgs> Event;
}
And an enumerable of those instances IEnumerable<EventedClass> objects,
you can use LINQ to project observables out of those classes, combine them with Observable.Merge which will give you the combined sequential output of those events.
Observable.Merge(
objects.Select(
o => Observable.FromEvent<EventArgs>(
handler => o.Event += handler,
handler => o.Event -= handler
)
)).Subscribe(args =>
{
//do stuff
});
It sounds like you are doing something similar to this question. Basically, you want to use a subject as your _pool variable and have it subscribe and unsubscribe to the different event sources in Register and Unregister. To unregister a source, you will need to keep the disposables you got in the call to Register. Also, I would consider making Pooler implement IObservable directly and just forward Subscribe to the _pool variable.
using System.Reactive.Subjects;
using System.Reactive.Linq;
public class Pooler
: IObservable<HappenedEventArgs>,
IDisposable
{
void Dispose()
{
if (_pool != null) _pool.Dispose();
if (_sourceSubs != null)
{
foreach (var d in _sourceSubs.Values)
{
d.Dispose();
}
_sourceSubs.Clear();
}
}
private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>();
private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>();
public IDisposable Subscribe(IObserver<HappenedEventArgs> observer)
{
return _pool.Subscribe(observer);
}
public void Register(IEventSource item)
{
if (_sourceSubs.ContainsKey(item))
{
return; //already registered
}
else
{
_sourceSubs.Add(item,
Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h,
h => item.Happened -= h)
.Select(ep => ep.EventArgs)
.Subscribe(_pool));
}
}
internal void Unregister(IEventSource item)
{
IDisposable disp = null;
if (_sourceSubs.TryGetValue(item, out disp))
{
_sourceSubs.Remove(item);
disp.Dispose();
}
}
}
Note that you will need to implement IDisposable to ensure that you can clean up all the event subscriptions when you are done with the Pooler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With