Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can an Observable be paused without losing the items emitted?

I have an Observable that emits ticks every second:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

I'd like to pause this Observable so it stops emitting numbers, and resume it on demand.

There are some gotchas:

  • according to the Observable Javadoc, interval operator doesn't support backpressure
  • the RxJava wiki about backpressure has a section about Callstack blocking as a flow-control alternative to backpressure:

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

Is there a way to pause an interval Observable? Or should I implement my own 'ticking' Observable with some backpressure support?

like image 353
mgryszko Avatar asked Feb 08 '23 14:02

mgryszko


1 Answers

There are many ways of doing this. For example, you can still use interval() and maintain two additional states: say boolean flag "paused" and a counter.

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

Then you just call paused.set(true/false) somewhere to pause/resume

EDIT 2016-06-04

There is a little problem with a solution above. If we reuse the observable instance multiple times, it will start from the value at the last unsubscribe. For example:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

While list1 will be expected [0,1,2,3,4], list2 will be actually [5,6,7,8,9]. If the behavior above is not desired, the observable has to be made stateless. This can be achieved by scan() operator. The revised version can look like this:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

Or if you do not wish dependency on Java 8 and lambdas you can do something like this using Java 6+ compatible code:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

like image 134
yurgis Avatar answered Apr 25 '23 12:04

yurgis