I need to implement an event processing, that is done delayed when there are no new events arriving for a certain period. (I have to queue up a parsing task when the text buffer changed, but I don't want to start the parsing when the user is still typing.)
I'm new in RX, but as far as I see, I would need a combination of BufferWithTime and the Timeout methods. I imagine this to be working like this: it buffers the events until they are received regularly within a specified time period between the subsequent events. If there is a gap in the event flow (longer than the timespan) it should return propagate the events buffered so far.
Having a look at how Buffer and Timeout is implemented, I could probably implement my BufferWithTimeout method (if everyone have one, please share with me), but I wonder if this can be achieved just by combining the existing methods. Any ideas?
This is quite an old question, but I do believe the following answer is worth mentioning, since all other solutions have forced the user to subscribe manually, track changes, etc.
I offer the following as an "Rx-y" solution.
var buffers = source
.GroupByUntil(
// yes. yes. all items belong to the same group.
x => true,
g => Observable.Amb<int>(
// close the group after 5 seconds of inactivity
g.Throttle(TimeSpan.FromSeconds(5)),
// close the group after 10 items
g.Skip(9)
))
// Turn those groups into buffers
.SelectMany(x => x.ToArray());
Basically, the source is windowed until some observerable defined in terms of the newest window. A new window (grouped observable) is created, and we use that window to determine when the window should close. In this case, I'm closing the window after 5 seconds of inactivity or a maximum length of 10 (9+1).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With