Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava2 buffer with debounce

Tags:

java

rx-java2

I want to buffer elements and emit them as collection when there is no new element for x amount of time. how to do that?

For example given input

INPUT    TIME
1        0  
2        0
3        100
4        150
5        400
6        450
7        800   

If my x=200 I want to emit {1, 2, 3, 4}, {5, 6}, {7}

What I tried is simple buffer() with time, but it doesn't offer debounce. I also tried throttleFirst() on source and flatMap() it with buffer().take(1) on source inside flatMap, it works similarly but not exactly as desired.

like image 801
Tuby Avatar asked Apr 16 '18 18:04

Tuby


1 Answers

You need publish as you need the same source to control the buffering behavior through a debounce:

static <T> ObservableTransformer<T, List<T>> bufferDebounce(
        long time, TimeUnit unit, Scheduler scheduler) {
    return o ->
        o.publish(v -> 
            v.buffer(v.debounce(time, unit, scheduler)
                .takeUntil(v.ignoreElements().toObservable())
            )
        );
}

@Test
public void test() {
    PublishSubject<Integer> ps = PublishSubject.create();

    TestScheduler sch = new TestScheduler();

    ps.compose(bufferDebounce(200, TimeUnit.MILLISECONDS, sch))
    .subscribe(
            v -> System.out.println(sch.now(TimeUnit.MILLISECONDS)+ ": " + v),
            Throwable::printStackTrace,
            () -> System.out.println("Done"));

    ps.onNext(1);
    ps.onNext(2);

    sch.advanceTimeTo(100, TimeUnit.MILLISECONDS);

    ps.onNext(3);

    sch.advanceTimeTo(150, TimeUnit.MILLISECONDS);

    ps.onNext(4);

    sch.advanceTimeTo(400, TimeUnit.MILLISECONDS);

    ps.onNext(5);

    sch.advanceTimeTo(450, TimeUnit.MILLISECONDS);

    ps.onNext(6);

    sch.advanceTimeTo(800, TimeUnit.MILLISECONDS);

    ps.onNext(7);
    ps.onComplete();

    sch.advanceTimeTo(850, TimeUnit.MILLISECONDS);
}

The takeUntil is there to prevent the completion of o to trigger an empty buffer.

like image 115
akarnokd Avatar answered Oct 27 '22 12:10

akarnokd