I have an application which at some points raises 1000 events almost at the same time. What I would like to do is to batch the events to chunks of 50 items and start processing them every 10 seconds. There's no need to wait for a batch to complete before starting a new batch processing.
For example:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
Any ideas how to achieve this? I suppose Reactive Extensions is the way to go but other solutions are acceptable too.
I tried to start from here:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
But noticed that the delay didn't work as I hoped for and instead all the batches started simultaneously, though 5 seconds delayed.
I also tested the Window-method, but I didn't notice any difference in behavior. I suppose the TimeSpan in Window actually means that "take every event which happens in the next 10 seconds:
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
I'm using the Rx-Main 2.0.20304-beta.
This is a surprisingly difficult problem to solve. More so because the enticing idea of using the Zip
operator to align the observable with an Observable.Interval
, is buggy and dangerously inefficient. The main problem with the Zip
operator, when used with asymmetric observables, is that it buffers the elements of the fastest producing observable, resulting potentially to massive memory allocation during a long-life subscription. IMHO the use of this operator should be limited to pairs of observables that are expected to produce an equal (or close to equal) number of elements in the long run.
The buggy behavior of the Zip
+Observable.Interval
combo emerges when the Observable.Interval
emits values faster than the source observable. In that case the superfluous values emitted by the Observable.Interval
are buffered, so when the source observable emits the next element there is already a buffered Interval
value to form a pair, resulting to the violation of the "minimum interval between elements" policy.
Below is an implementation of a custom WithInterval
operator that imposes a minimum interval between consecutive elements of an observable sequence. This operator will then be used for solving the specific problem of this question, that involves buffers instead of individual elements:
/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
return source
.Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
{
var (previousTimer, _) = state;
var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
: Observable.Timer(interval)).PublishLast();
var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
return (timer, delayed);
})
.Select(e => e.Item2)
.Concat();
}
This implementation places an Observable.Timer
between consecutive elements. The tricky part is how to activate each timer at exactly the right moment. It is achieved by Publish
ing the timers, and having each timer warm (Connect
) the next one when it completes.
With this operator in place, implementing a custom BatchWithInterval
operator is trivial:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
int count, TimeSpan interval, IScheduler scheduler = null)
{
return source.Buffer(count).WithInterval(interval, scheduler);
}
Usage example:
var subscription = eventAsObservable
.BatchWithInterval(50, TimeSpan.FromSeconds(10))
.Subscribe(DoProcessing);
If you'd prefer not to sleep threads, you can do this:
var tick = Observable.Interval(TimeSpan.FromSeconds(5));
eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
There's a specific Buffer method overload just for this: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx
observable.Buffer(TimeSpan.FromSeconds(5), 50);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With