Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions: Process events in batches + add delay between every batch

I have an application which at some points raises 1000 events almost at the same time. What I would like to do is to batch the events to chunks of 50 items and start processing them every 10 seconds. There's no need to wait for a batch to complete before starting a new batch processing.

For example:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

Any ideas how to achieve this? I suppose Reactive Extensions is the way to go but other solutions are acceptable too.

I tried to start from here:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

But noticed that the delay didn't work as I hoped for and instead all the batches started simultaneously, though 5 seconds delayed.

I also tested the Window-method, but I didn't notice any difference in behavior. I suppose the TimeSpan in Window actually means that "take every event which happens in the next 10 seconds:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

I'm using the Rx-Main 2.0.20304-beta.

like image 423
Mikael Koskinen Avatar asked Jun 07 '12 07:06

Mikael Koskinen


3 Answers

This is a surprisingly difficult problem to solve. More so because the enticing idea of using the Zip operator to align the observable with an Observable.Interval, is buggy and dangerously inefficient. The main problem with the Zip operator, when used with asymmetric observables, is that it buffers the elements of the fastest producing observable, resulting potentially to massive memory allocation during a long-life subscription. IMHO the use of this operator should be limited to pairs of observables that are expected to produce an equal (or close to equal) number of elements in the long run.

The buggy behavior of the Zip+Observable.Interval combo emerges when the Observable.Interval emits values faster than the source observable. In that case the superfluous values emitted by the Observable.Interval are buffered, so when the source observable emits the next element there is already a buffered Interval value to form a pair, resulting to the violation of the "minimum interval between elements" policy.

Below is an implementation of a custom WithInterval operator that imposes a minimum interval between consecutive elements of an observable sequence. This operator will then be used for solving the specific problem of this question, that involves buffers instead of individual elements:

/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    return source
        .Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
        {
            var (previousTimer, _) = state;
            var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
                : Observable.Timer(interval)).PublishLast();
            var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
            return (timer, delayed);
        })
        .Select(e => e.Item2)
        .Concat();
}

This implementation places an Observable.Timer between consecutive elements. The tricky part is how to activate each timer at exactly the right moment. It is achieved by Publishing the timers, and having each timer warm (Connect) the next one when it completes.

With this operator in place, implementing a custom BatchWithInterval operator is trivial:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
    int count, TimeSpan interval, IScheduler scheduler = null)
{
    return source.Buffer(count).WithInterval(interval, scheduler);
}

Usage example:

var subscription = eventAsObservable
    .BatchWithInterval(50, TimeSpan.FromSeconds(10))
    .Subscribe(DoProcessing);
like image 145
Theodor Zoulias Avatar answered Oct 12 '22 13:10

Theodor Zoulias


If you'd prefer not to sleep threads, you can do this:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
like image 23
yamen Avatar answered Oct 22 '22 02:10

yamen


There's a specific Buffer method overload just for this: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
like image 2
Sámal Rasmussen Avatar answered Oct 22 '22 01:10

Sámal Rasmussen