Looking for a clean way to transform a source Observable
to emit a single null
(or sentinel value) after not emitting an item for some duration.
For example, if the source observable emits 1, 2, 3
then stops emitting for 10 seconds before emitting 4, 5, 6
I would like the emitted items to be 1, 2, 3, null, 4, 5, 6
.
The use case is for displaying values in a UI where the displayed value should turn into a dash -
or N/A
if the last emitted value is stale/old.
I looked into the timeout
operator but it terminates the Observable
when the timeout occurs which is undesirable.
Using RxJava.
Based on akarnokd's answer and an answer in a similar question, an alternative implementation:
If you're looking for a single value to indicate the lapse of time between emissions:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
If you're looking to receive values continuously after the source observable not emitting for some duration:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
The difference being the timeout
observable and whether it's recurring or not.
You can replace -1
with null
as needed.
All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72
.
You can achieve this with a somewhat complicated publish-amb-timer setup:
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();
ps.publish(o ->
o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
.repeat().takeUntil(o.ignoreElements())
).subscribe(ts);
ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
s.advanceTimeBy(15, TimeUnit.SECONDS);
ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();
ts.assertValues(1, 2, 3, null, 4, 5, 6);
What happens is that the source is published so you can take items one by one from it or a timer event, make sure the fastest one wins and repeat it with the next value, all without resubscribing to the original source all the time.
Edit fixed the case when the upstream completes the repeat() goes into an infinite loop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With