Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions: buffer until subscriber is idle

I have a program where I'm receiving events and want to process them in batches, so that all items that come in while I'm processing the current batch will appear in the next batch.

The simple TimeSpan and count based Buffer methods in Rx will give me multiple batches of items instead of giving me one big batch of everything that has come in (in cases when the subscriber takes longer than the specified TimeSpan or more than N items come in and N is greater than count).

I looked at using the more complex Buffer overloads that take Func<IObservable<TBufferClosing>> or IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>>, but I can't find examples of how to use these, much less figure out how to apply them to what I'm trying to do.

like image 914
Chris Eldredge Avatar asked Nov 27 '12 21:11

Chris Eldredge


1 Answers

Does this do what you want?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

My Result:

1-2-3
4-5
6-7
like image 129
Enigmativity Avatar answered Sep 21 '22 11:09

Enigmativity