I have a problem for which Reactive Extensions seems particularly well suited. I have an event source that creates events in short bursts with relatively long idle periods in between. I would like to group those events in batches where (ideally) each burst of events would end up in one batch. Using RxJava, is there a good way to do this? Observable.buffer(Observable) or Observable.buffer(Func0) seem promising, but it might be possible to do using Observable.window() or Observable.groupByUntil().
Any of those operators you mentioned will work, it just depends on what semantics you want.
If you want each group to be a List then use buffer: https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer
If you want each group to be a sequence of then use window: https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window
Both of those are very similar, just their output is different. Window allows you to process each item in each group as they are emitted, whereas buffer will wait until all items in the group are collected and then emit. So if you want to process all items in each group together, then use buffer. Or, window can be combined with scan to process items sequentially in each window and do stateful operations on them as they are emitted.
groupByUntil is probably not what you're looking for if time is what you're grouping on, since groupBy/groupByUntil is about grouping on keys.
The one problem I can see with buffer/window is that they typically have fixed intervals, or require you specifying via another Observable when the window starts and ends. You seem to want something that triggers after a period of idleness, which is more like debounce, but debounce will only give you the last value, not the whole group.
You could do a complicated combination of these, by multicasting the stream, routing one through buffer, another through debounce, and using the debounce output to signal the window start/end points. That's pretty tricky though and I've never tried it.
Do the existing buffer/window use cases work for you, or do you need a bufferedDebounce behavior?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With