I want to buffer elements and emit them as collection when there is no new element for x amount of time. how to do that?
For example given input
INPUT    TIME
1        0  
2        0
3        100
4        150
5        400
6        450
7        800   
If my x=200 I want to emit {1, 2, 3, 4}, {5, 6}, {7}
What I tried is simple buffer() with time, but it doesn't offer debounce. I also tried throttleFirst() on source and flatMap() it with buffer().take(1) on source inside flatMap, it works similarly but not exactly as desired.
You need publish as you need the same source to control the buffering behavior through a debounce:
static <T> ObservableTransformer<T, List<T>> bufferDebounce(
        long time, TimeUnit unit, Scheduler scheduler) {
    return o ->
        o.publish(v -> 
            v.buffer(v.debounce(time, unit, scheduler)
                .takeUntil(v.ignoreElements().toObservable())
            )
        );
}
@Test
public void test() {
    PublishSubject<Integer> ps = PublishSubject.create();
    TestScheduler sch = new TestScheduler();
    ps.compose(bufferDebounce(200, TimeUnit.MILLISECONDS, sch))
    .subscribe(
            v -> System.out.println(sch.now(TimeUnit.MILLISECONDS)+ ": " + v),
            Throwable::printStackTrace,
            () -> System.out.println("Done"));
    ps.onNext(1);
    ps.onNext(2);
    sch.advanceTimeTo(100, TimeUnit.MILLISECONDS);
    ps.onNext(3);
    sch.advanceTimeTo(150, TimeUnit.MILLISECONDS);
    ps.onNext(4);
    sch.advanceTimeTo(400, TimeUnit.MILLISECONDS);
    ps.onNext(5);
    sch.advanceTimeTo(450, TimeUnit.MILLISECONDS);
    ps.onNext(6);
    sch.advanceTimeTo(800, TimeUnit.MILLISECONDS);
    ps.onNext(7);
    ps.onComplete();
    sch.advanceTimeTo(850, TimeUnit.MILLISECONDS);
}
The takeUntil is there to prevent the completion of o to trigger an empty buffer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With