I have a sequence of events that happen every 10-1000 ms. I subscribe to this source of events, but want to handle them at a fixed (or minimum) interval of 500ms. I also want to process ONE event at a time, not in batches (like Buffer(x > 1)).
Something like this in pseudo code:
observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);
Tried e.g.:
observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);
and a lot of other potential solutions. No luck so far.
Any ideas?
I answered this very question on my blog here.
Reproducing (in case of link rot!) with the addition of presenting as an extension method:
Sometimes, you want to limit the rate at which events arrive from an Rx stream.
The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.
I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.
One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.
So how do we translate this metaphor into Rx?
We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.
To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.
Lets assume the input events are an IObservable in the variable input. Here’s the code:
var paced = input.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat();
As an extension method this becomes:
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat();
}
Since you want to keep all events, I think you're on the right track with Buffer
. But you should call it with a TimeSpan
...
observable.Buffer(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);
... where v
is an IList<TSource>
that you can loop over.
Your original call of Buffer(1)
will fire whenever it gets 1 event, which is the same as if it wasn't there at all. Buffering with a time window will collect all the events that fire within the interval and give it to you at the end of each interval.
This is my attempt:
public static IObservable<T> MinimumInterval<T>(this IObservable<T> source, TimeSpan rate, IScheduler scheduler = null)
{
if (scheduler == null)
scheduler = TaskPoolScheduler.Default;
Func<IObserver<T>, IDisposable> subscribe = obs => {
var nextTick = scheduler.Now;
var subscriptions = new CompositeDisposable();
Action<T> onNext = value =>
{
var sendTime = Max(nextTick, scheduler.Now);
var disp = new SingleAssignmentDisposable();
disp.Disposable = scheduler.Schedule(sendTime, () =>
{
subscriptions.Remove(disp);
obs.OnNext(value);
});
subscriptions.Add(disp);
nextTick = sendTime + rate;
};
Action<Exception> onError = err => { subscriptions.Dispose(); obs.OnError(err); };
Action onCompleted = () => { subscriptions.Dispose(); obs.OnCompleted(); };
var listener = Observer.Create(onNext, onError, onCompleted);
subscriptions.Add(source.Subscribe(listener));
return subscriptions;
};
return Observable.Create<T>(subscribe);
}
It keeps track of the earliest the next message can be sent and uses the scheduler to delay events if they occur too soon. The CompositeDisposable ensures scheduled events are cancelled when the listener unsubscribes.
Constructive feedback is welcome.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With