Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: Window on count or timespan without dropping elements

Tags:

rx-java

I want to divide my stream in batches of a certain maximum size, and if this size isn't reached after a while close the batch and start a new one. For that I tried using window(count):

things.window(10)

However this waits until 10 elements are received to emit a new Observable window. If I use the window(timespan, unit, count) operator:

things.window(1, TimeUnit.SECONDS, 10)

I will loose all the elements that came after the 10th and before the timespan is completed.

I'd like a similar operator that instead of waiting for the timespan to complete emit a new window when the count is reached.

things.windowXXX(timespan = 1s, count = 2) : Observable[T]

things:   ----o--o--o-----------o----o------->
timespan: [      )[    1s    ][      )[    -->
window 1: -----o-o-|
window 2:          -o---------|
window 3:                     --o-----o-| 
like image 672
jcfandino Avatar asked Aug 06 '14 16:08

jcfandino


1 Answers

I found a way to workaround this issue by using buffer(timespan, timeunit, count) instead of window. I thought buffer and window had the same behavior just that one emitted lists and the other observables. But there seems there's a difference here.

My solution was to use buffer and then map each result to an observable:

observable.buffer(10, TimeUnit.MILLISECONDS, 2)
          .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
           })

I made some tests to see the difference: https://gist.github.com/jcfandino/fd47277ada821f51a9d4

    observable.map(delayOn4).window(10, TimeUnit.MILLISECONDS, 2)
            .subscribe(new UpdateCountdowns("window"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));

Prints:

window - 1968827463: 1
window - 1968827463: 2
window - 1968827463: 3
window - 1267747034: 4
window - 1267747034: 5

Notice that it emits two batches instead of three, and the first one has three elements instead of two.

On the other hand, by using buffer:

    observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2)
            .map(new Func1<List<String>, Observable<String>>() {
                public Observable<String> call(List<String> l) {
                    return Observable.from(l);
                }
            }).subscribe(new UpdateCountdowns("buffer"));
    assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS));
    assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS));

Prints:

buffer - 1257525795: 1
buffer - 1257525795: 2
buffer - 1849466438: 3
buffer - 104886386: 4
buffer - 104886386: 5

Three batches with two elements at the most, as the "4" arrived late it ended in the next batch.

I don't know if this is the expected behavior or there may be a bug in there.

like image 183
jcfandino Avatar answered Sep 30 '22 07:09

jcfandino