Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to Observable.Buffer on something other than time

I've been looking for examples on how to use Observable.Buffer in rx but can't find anything more substantial than boiler plate time buffered stuff.

There does seem to be an overload to specify a "bufferClosingSelector" but I can't wrap my mind around it.

What I'm trying to do is create a sequence that buffers by time or by an "accumulation". Consider a request stream where every request has some sort of weight to it and I do not want to process more than x accumulated weight at a time, or if not enough has accumulated just give me what has come in the last timeframe(regular Buffer functionality)

like image 920
Dmitry Avatar asked Mar 05 '12 13:03

Dmitry


1 Answers

bufferClosingSelector is a function called every time to get an Observable which will produce a value when the buffer is expected to be closed.

For example,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) works like the regular Buffer(time) overload.

In you want to weight a sequence, you can apply a Scan over the sequence and then decide on your aggregating condition.

E.g., source.Scan((a,c) => a + c).SkipWhile(a => a < 100) gives you a sequence which produces a value when the source sequence has added up to more than 100.

You can use Amb to race these two closing conditions to see which reacts first:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

You can use any series of combinators which produces any value for the buffer to be closed at that point.

Note: The value given to the closing selector doesn't matter - it's the notification that matters. So to combine sources of different types with Amb simply change it to System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
like image 177
Asti Avatar answered Oct 14 '22 18:10

Asti