Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Rx to synchronize asynchronous events

I want to put Reactive Extensions for .NET (Rx) to good use and would like to get some input on doing some basic tasks. To illustrate what I'm trying to do I have a contrived example where I have an external component with asyncronous events:

class Component {

  public void BeginStart() { ... }

  public event EventHandler Started;

}

The component is started by calling BeginStart(). This method returns immediately, and later, when the component has completed startup, the Started event fires.

I want to create a synchronous start method by wrapping the component and wait until the Started event is fired. This is what I've come up with so far:

class ComponentWrapper {

  readonly Component component = new Component();

  void StartComponent() {
    var componentStarted =
      Observable.FromEvent<EventArgs>(this.component, "Started");
    using (var startedEvent = new ManualResetEvent(false))
      using (componentStarted.Take(1).Subscribe(e => { startedEvent.Set(); })) {
        this.componenet.BeginStart();
        startedEvent.WaitOne();
      }
  }

}

I would like to get rid of the ManualResetEvent, and I expect that Rx has a solution. But how?

like image 981
Martin Liversage Avatar asked Jun 02 '10 08:06

Martin Liversage


2 Answers

PL's answer if perfectly good for your spec, but I thought you might get better results by not fighting RX with .First() but embracing it with creating an observable to your component:

    public static IObservable<Unit> AsObservable(this Component component)
    {
        return Observable.Defer(() =>
        {
            component.BeginStart();
            return Observable
                .FromEvent<EventArgs>(component, "Started")
                .Select(_ => new Unit());
        });
    }

Then you could use it as blocking:

new Component().AsObservable().First();

Non - blocking:

new Component().AsObservable().Subscribe(_ => Console.WriteLine("Done"));

Hot:

var pub = new Component().AsObservable().Publish();
pub.Subscribe(_ => Console.WriteLine("Sub1"));
pub.Subscribe(_ => Console.WriteLine("Sub2"));
pub.Connect();  // started just once per two subscriptions

Composable:

new Component().AsObservable().Delay(TimeSpan.FromSeconds(1));

etc...

EDIT: For the case of multiple events that you have to wait on and collect information, the following variation could be used:

public static IObservable<EventArgs> AsObservable(this Component component)
{
    return Observable.Defer(() =>
    {
        component.BeginStart();
        return 
            Observable.FromEvent<EventArgs>(component, "Started1").Take(1)
                .Merge(
            Observable.FromEvent<EventArgs>(component, "Started2").Take(1))
                .Select(evt => evt.EventArgs);
    });
}

With this one, if you want to block till completion, you might use .AsObservable.Last().

like image 97
Sergey Aldoukhov Avatar answered Nov 05 '22 01:11

Sergey Aldoukhov


Something like this should do it:

var replay = Observable
    .FromEvent<EventArgs>(this.component, "Started")
    .Replay();
replay.Connect();
component.BeginStart();
replay.First();
like image 29
PL. Avatar answered Nov 05 '22 02:11

PL.