Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement buffering with timeout in RX

I need to implement an event processing, that is done delayed when there are no new events arriving for a certain period. (I have to queue up a parsing task when the text buffer changed, but I don't want to start the parsing when the user is still typing.)

I'm new in RX, but as far as I see, I would need a combination of BufferWithTime and the Timeout methods. I imagine this to be working like this: it buffers the events until they are received regularly within a specified time period between the subsequent events. If there is a gap in the event flow (longer than the timespan) it should return propagate the events buffered so far.

Having a look at how Buffer and Timeout is implemented, I could probably implement my BufferWithTimeout method (if everyone have one, please share with me), but I wonder if this can be achieved just by combining the existing methods. Any ideas?

like image 741
Gaspar Nagy Avatar asked Jan 11 '11 08:01

Gaspar Nagy


1 Answers

This is quite an old question, but I do believe the following answer is worth mentioning, since all other solutions have forced the user to subscribe manually, track changes, etc.

I offer the following as an "Rx-y" solution.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

Basically, the source is windowed until some observerable defined in terms of the newest window. A new window (grouped observable) is created, and we use that window to determine when the window should close. In this case, I'm closing the window after 5 seconds of inactivity or a maximum length of 10 (9+1).

like image 116
cwharris Avatar answered Nov 15 '22 05:11

cwharris