Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Force flush to Observable.Buffer c#

Is there any way to force a Observable.Buffer to flush before the end of buffered time?

In the example:

mSubscription = mFluxObservable.Buffer(new TimeSpan(0, 0, 1, 30)).Subscribe(o => saver(o, iSessionId));

I want to flush the data before the 1:30 period has finished!

like image 871
leonardonsantana Avatar asked Oct 17 '12 23:10

leonardonsantana


2 Answers

This worked for me:

var subject = new Subject<Unit>();
var closing = Observable
    .Timer(new TimeSpan(0, 0, 1, 30))
    .Select(x => Unit.Default);

var query =
    mFluxObservable
        .Buffer(() => Observable
            .Amb(subject, closing)
            .Take(1));

Now I only need to call subject.OnNext(Unit.Default) to force the buffer to flush. A new buffer is started immediately after the flush.

like image 91
Enigmativity Avatar answered Nov 01 '22 18:11

Enigmativity


...and basically the same principal using window:

var bufferPeriod = TimeSpan.FromSeconds(1.5);
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(50);

//source.Buffer(bufferPeriod).Dump();

var bufferFlush = new Subject<long>();//Or Subject<Unit>
source.Window(
        ()=>Observable.Merge(Observable.Timer(bufferPeriod), bufferFlush))
    .Select(window=>window.ToList())
    .Dump();

//Simulate calling flush.
Observable.Interval(TimeSpan.FromMilliseconds(1350)).Take(2).Subscribe(bufferFlush);
like image 24
Lee Campbell Avatar answered Nov 01 '22 18:11

Lee Campbell