I have an Observable
that emits ticks every second:
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));
I'd like to pause this Observable so it stops emitting numbers, and resume it on demand.
There are some gotchas:
Observable
Javadoc, interval
operator doesn't support backpressureAnother way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.
Is there a way to pause an interval
Observable? Or should I implement my own 'ticking' Observable with some backpressure support?
There are many ways of doing this. For example, you can still use interval()
and maintain two additional states: say boolean flag "paused" and a counter.
public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {
final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}
Then you just call paused.set(true/false) somewhere to pause/resume
EDIT 2016-06-04
There is a little problem with a solution above. If we reuse the observable instance multiple times, it will start from the value at the last unsubscribe. For example:
Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();
While list1 will be expected [0,1,2,3,4], list2 will be actually [5,6,7,8,9]. If the behavior above is not desired, the observable has to be made stateless. This can be achieved by scan() operator. The revised version can look like this:
public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {
return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}
Or if you do not wish dependency on Java 8 and lambdas you can do something like this using Java 6+ compatible code:
https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With