Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe
method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.
However, when my Subscribe
method completes, if any messages did come through I want to process the last one. So I always process the most recent message.
So, if I have some code which does:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.
Here's an example of the result I want using a background task and Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).
I can also get the result I want by using a BroadcastBlock
from TPL Dataflow, like this:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
but this feels like it should be possible directly in Rx. What's the best way to go about doing it?
Here is a method that is similar to Dave's but uses Sample
instead (which is more appropriate than buffer). I've included a similar extension method to the one I added to Dave's answer.
The extension:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
Note that it's simpler, and there is no 'empty' buffer that's fired. The first element that is sent to the action actually comes from the stream itself.
Usage is straightforward:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
And results:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}
if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With