I want to divide my stream in batches of a certain maximum size, and if this size isn't reached after a while close the batch and start a new one. For that I tried using window(count):
things.window(10)
However this waits until 10 elements are received to emit a new Observable window. If I use the window(timespan, unit, count) operator:
things.window(1, TimeUnit.SECONDS, 10)
I will loose all the elements that came after the 10th and before the timespan is completed.
I'd like a similar operator that instead of waiting for the timespan to complete emit a new window when the count is reached.
things.windowXXX(timespan = 1s, count = 2) : Observable[T]
things: ----o--o--o-----------o----o------->
timespan: [ )[ 1s ][ )[ -->
window 1: -----o-o-|
window 2: -o---------|
window 3: --o-----o-|
I found a way to workaround this issue by using buffer(timespan, timeunit, count) instead of window. I thought buffer and window had the same behavior just that one emitted lists and the other observables. But there seems there's a difference here.
My solution was to use buffer and then map each result to an observable:
observable.buffer(10, TimeUnit.MILLISECONDS, 2)
.map(new Func1<List<String>, Observable<String>>() {
public Observable<String> call(List<String> l) {
return Observable.from(l);
}
})
I made some tests to see the difference: https://gist.github.com/jcfandino/fd47277ada821f51a9d4
observable.map(delayOn4).window(10, TimeUnit.MILLISECONDS, 2)
.subscribe(new UpdateCountdowns("window"));
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
Prints:
window - 1968827463: 1
window - 1968827463: 2
window - 1968827463: 3
window - 1267747034: 4
window - 1267747034: 5
Notice that it emits two batches instead of three, and the first one has three elements instead of two.
On the other hand, by using buffer:
observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2)
.map(new Func1<List<String>, Observable<String>>() {
public Observable<String> call(List<String> l) {
return Observable.from(l);
}
}).subscribe(new UpdateCountdowns("buffer"));
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));
Prints:
buffer - 1257525795: 1
buffer - 1257525795: 2
buffer - 1849466438: 3
buffer - 104886386: 4
buffer - 104886386: 5
Three batches with two elements at the most, as the "4" arrived late it ended in the next batch.
I don't know if this is the expected behavior or there may be a bug in there.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With