Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pause and resume an observable based on a boolean gate in RxJava 2.X?

Let's say I have a Processor that emits a boolean value when ever a button is pressed, think of this as a toggle.

    boolean gateValue = true;
    PublishProcessor<Boolean> gate = PublishProcessor.create();
    view.onButtonClicked()
            .subscribe(new Action1<Void>() {
                @Override
                public void call(final Void aVoid) {
                    gate.onNext(gateValue = !gateValue);
                }
            }));

What I would like to do is use the value of the gate to pause and resume an observable sequence, buffering the emitted values whilst paused.

I've read into this a lot and while it seems possible in reactive extensions for other languages, RxJava doesn't seem to support it.

Here's an example of what I'd like to achieve, it simply outputs an incremental value every second. When I press the button I want the output to stop until I press it again which should output every item emitted between the two button presses:

Flowable.interval(1, TimeUnit.SECONDS)
                .bufferWhile(gate)
                .flatMapIterable(longs -> longs)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(final Long aLong) throws Exception {
                        view.displayTime(aLong);
                    }
                });

Does anyone know of a way to achieve something like this?

Edit I've written a blog post on how to achieve this https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

like image 702
Scott Cooper Avatar asked Nov 15 '16 17:11

Scott Cooper


2 Answers

There is now an operator valve() in the RxJava2Extensions library that does the requested behavior.

like image 110
akarnokd Avatar answered Oct 15 '22 10:10

akarnokd


You can achieve this using default RxJava operators:

    final Random random = new Random();
    final Observable<Boolean> gates = Observable.interval(10, TimeUnit.SECONDS)
            .map(it -> random.nextBoolean())
            .startWith(false)
            .doOnNext(it -> System.out.println("Gate " + (it ? "opened" : "closed")))
            .take(100);
    final Observable<Long> data = Observable.interval(3, TimeUnit.SECONDS)
            .doOnNext(it -> System.out.println("New value " + it))
            .take(100);

    gates.publish(innerGates -> data.publish(innerData -> Observable.merge(
            innerData.window(
                    innerGates.distinctUntilChanged().filter(it -> it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> !it))
            ).flatMap(it -> it),
            innerData.buffer(
                    innerGates.distinctUntilChanged().filter(it -> !it),
                    (ignore -> innerGates.distinctUntilChanged().filter(it -> it))
            )
                    .flatMap(Observable::from)
    )))
            .subscribe(it -> System.out.println("On next " + it));
like image 44
Guliash Avatar answered Oct 15 '22 10:10

Guliash