I have an Action<Action> operation which does some long-running work and sends heartbeats through the provided Action argument.
Before I invoke it, I want to set up an IObservable<Unit> that produces a Next element each time the operation sends a heartbeat.
How do I do that?
Creating the IObservable is not that complicated, but I’m guessing the conceptually easiest way would be create it from an event, but sadly you cannot create an event in a method body (unless you know a way). I could use the Observable.FromEvent(Action<Action> addHandler, Action<Action> removeHandler), but that would require ugly closed-over Actions to forward method calls to (see example below).
Is there a more elegant way?
Action forward = null;
Action sendHeartbeat = () => { if (forward != null) forward(); };
//ugly, since it does not scale to multiple observers:
IObservable<Unit> heartbeatObs =
Observable.FromEvent(handler => {
forward = handler;
}, _ => { forward = null; });
operation(sendHeartbeat);
Even if I did not use zero dimensional forward actions, but a collection thereof, it would be ugly because I’d be implementing the += and -= operators for EventHandlers all over again.
Not sure I've fully understood your question, but if I have, you could do something like this, which uses a Subject to transmit the heartbeats. You can find subjects in the namespace System.Reactive.Subjects.
// set up a subject
Subject<Unit> heartbeatSubject=new Subject<Unit>();
// set up the heartbeataction to push units down the subject
Action sendHeartbeat = () => { heartbeatSubject.OnNext(Unit.Default); };
// use the heartbeat subject somehow.
heartbeatSubject.Subscribe(_=>Console.WriteLine("Operation produced a heartbeat"));
// start the operation
operation(sendHeartbeat);
Edit
As mentioned in your comment, and according to the official RX line, using a subject is best avoided except in demo/example code. I've just had a go using Observable.Create, which might be better?
Action<Action> longRunningAction=(
hb => { for(int i=0;i<100;i++) { hb(); Thread.Sleep(1000);}});
var ob=Observable.Create<Unit>(
(IObserver<Unit> observer) =>
{
longRunningAction(()=>observer.OnNext(Unit.Default));
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
ob.Subscribe(_=>Console.WriteLine("Received a heartbeat"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With