Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Action to event

I have an Action<Action> operation which does some long-running work and sends heartbeats through the provided Action argument. Before I invoke it, I want to set up an IObservable<Unit> that produces a Next element each time the operation sends a heartbeat. How do I do that?

Creating the IObservable is not that complicated, but I’m guessing the conceptually easiest way would be create it from an event, but sadly you cannot create an event in a method body (unless you know a way). I could use the Observable.FromEvent(Action<Action> addHandler, Action<Action> removeHandler), but that would require ugly closed-over Actions to forward method calls to (see example below).
Is there a more elegant way?

Action forward = null;
Action sendHeartbeat = () => { if (forward != null) forward(); };
//ugly, since it does not scale to multiple observers:
IObservable<Unit> heartbeatObs =
  Observable.FromEvent(handler => {
    forward = handler;
  }, _ => { forward = null; });
operation(sendHeartbeat);

Even if I did not use zero dimensional forward actions, but a collection thereof, it would be ugly because I’d be implementing the += and -= operators for EventHandlers all over again.

like image 338
derabbink Avatar asked Jan 01 '26 04:01

derabbink


1 Answers

Not sure I've fully understood your question, but if I have, you could do something like this, which uses a Subject to transmit the heartbeats. You can find subjects in the namespace System.Reactive.Subjects.

 // set up a subject
 Subject<Unit> heartbeatSubject=new Subject<Unit>();

 // set up the heartbeataction to push units down the subject
 Action sendHeartbeat = () => { heartbeatSubject.OnNext(Unit.Default); };

 // use the heartbeat subject somehow.
 heartbeatSubject.Subscribe(_=>Console.WriteLine("Operation produced a heartbeat"));

 // start the operation
 operation(sendHeartbeat);

Edit

As mentioned in your comment, and according to the official RX line, using a subject is best avoided except in demo/example code. I've just had a go using Observable.Create, which might be better?

Action<Action> longRunningAction=(
    hb => { for(int i=0;i<100;i++) { hb(); Thread.Sleep(1000);}});

var ob=Observable.Create<Unit>(
(IObserver<Unit> observer) =>
{
    longRunningAction(()=>observer.OnNext(Unit.Default));
    return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});

ob.Subscribe(_=>Console.WriteLine("Received a heartbeat"));
like image 153
Daniel Neal Avatar answered Jan 02 '26 16:01

Daniel Neal



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!