Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ReactiveX emit null or sentinel value after timeout

Looking for a clean way to transform a source Observable to emit a single null (or sentinel value) after not emitting an item for some duration.

For example, if the source observable emits 1, 2, 3 then stops emitting for 10 seconds before emitting 4, 5, 6 I would like the emitted items to be 1, 2, 3, null, 4, 5, 6.

The use case is for displaying values in a UI where the displayed value should turn into a dash - or N/A if the last emitted value is stale/old.

I looked into the timeout operator but it terminates the Observable when the timeout occurs which is undesirable.

Using RxJava.

like image 361
jenglert Avatar asked Mar 08 '16 16:03

jenglert


2 Answers

Based on akarnokd's answer and an answer in a similar question, an alternative implementation:

A single sentinel value (as per the OP)

If you're looking for a single value to indicate the lapse of time between emissions:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

Continuous sentinel values

If you're looking to receive values continuously after the source observable not emitting for some duration:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

The difference being the timeout observable and whether it's recurring or not.

You can replace -1 with null as needed.

All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72.

like image 55
Whymarrh Avatar answered Oct 19 '22 07:10

Whymarrh


You can achieve this with a somewhat complicated publish-amb-timer setup:

PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler s = Schedulers.test();
TestSubscriber<Integer> ts = new TestSubscriber<>();

ps.publish(o -> 
    o.take(1).ambWith(Observable.timer(10, TimeUnit.SECONDS, s).map(v -> (Integer)null))
    .repeat().takeUntil(o.ignoreElements())
).subscribe(ts);

ps.onNext(1);
ps.onNext(2);
ps.onNext(3);

s.advanceTimeBy(15, TimeUnit.SECONDS);

ps.onNext(4);
ps.onNext(5);
ps.onNext(6);
ps.onCompleted();

ts.assertValues(1, 2, 3, null, 4, 5, 6);

What happens is that the source is published so you can take items one by one from it or a timer event, make sure the fastest one wins and repeat it with the next value, all without resubscribing to the original source all the time.

Edit fixed the case when the upstream completes the repeat() goes into an infinite loop.

like image 30
akarnokd Avatar answered Oct 19 '22 09:10

akarnokd