I have a PublishSubject that keeps emiting a signal every X seconds, I would like to only consider the first item that is emitted after Y seconds.
Example
That is, skip every item until a certain timespan has passed.
This is not a debounce, cause debounce needs the original Observable to stay 5 seconds without emitting, but mine is emitting every second.
Please have a look at my solution. 'tickEverySecond' will emit a value every second. 'result' will collect all items emitted by 'tickEverySecond' during a time-window of 5 seconds. Every 5 second window the last emitted value from 'tickEverySecond' will be pushed to subscriber.
@Test
public void name() {
TestScheduler testScheduler = new TestScheduler();
Flowable<Long> tickEverySecond = Flowable.interval(0, 1, TimeUnit.SECONDS, testScheduler);
Flowable<Long> result = tickEverySecond.window(5, TimeUnit.SECONDS, testScheduler).flatMap(longFlowable -> longFlowable.takeLast(1));
TestSubscriber<Long> test = result.test();
testScheduler.advanceTimeTo(4, TimeUnit.SECONDS);
test.assertValueCount(0).assertNotComplete();
testScheduler.advanceTimeTo(5, TimeUnit.SECONDS);
test.assertValue(4L).assertNotComplete();
testScheduler.advanceTimeTo(9, TimeUnit.SECONDS);
test.assertValues(4L).assertNotComplete();
testScheduler.advanceTimeTo(10, TimeUnit.SECONDS);
test.assertValues(4L, 9L).assertNotComplete();
testScheduler.advanceTimeTo(14_999, TimeUnit.MILLISECONDS);
test.assertValues(4L, 9L).assertNotComplete();
}
You can have the same effect with following operator:
Flowable<Long> result = tickEverySecond.throttleLast(5, TimeUnit.SECONDS, testScheduler);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With