I have the following Hot Observable:
hotObservable = Observable.interval(0L, 1L, TimeUnit.SECONDS)
.map((t) -> getCurrentTimeInMillis()))
However, I can't find a good way to stop it. I was able to partially solve this using takeWhile
and a boolean
flag (runTimer
):
Observable.interval(0L, 1L, TimeUnit.SECONDS)
.takeWhile((t) -> runTimer)
.map((t) -> getCurrentTimeInMillis()))
There are 2 things I don't like in this approach though:
runTimer
around, which I don't want.runTimer
becomes false
, the Observable simply completes, which means if I want to emit again I need to create a new Observable. I don't want that. I just want the Observable stop emitting items until I tell it to start again.I was hoping for something like this:
hotObservable.stop();
hotObservable.resume();
That way I don't need to keep any flags around and the observable is always alive (it might not be emitting events though).
How can I achieve this?
One possible approach uses a BehaviorSubject and a switchMap:
BehaviorSubject<Boolean> subject = BehaviorSubject.create(true);
hotObservable = subject.distinctUntilChanged().switchMap((on) -> {
if (on) {
return Observable.interval(0L, 1L, TimeUnit.SECONDS);
} else {
return Observable.never();
}
}).map((t) -> getCurrentTimeInMillis());
By sending booleans to the subject the output of the observable can be controlled. subject.onNext(true)
will cause any observable created using that subject to begin emitting values. subject.onNext(false)
disables that flow.
The switchMap
takes care of disposing the underlying observables when it is switched off. It also uses distinctUntilChanged
to make sure it does not do unnecessary switching.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With