Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Interrupt a single observable in concatMap

I use concatMap to process a stream of items one at a time with a long running operation. At some point I need to "interrupt" this long running operation, but only for the current item:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}

I tried to solve this by retaining the disposable of the "inner" stream and disposing of it at the right time. But this didn't work. The inner stream is disposed, but the concatMap never moved on to processing item 3. The test just hangs (since the outer observable never completes/terminates/disposes either)

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}

Even if this did work it seemed a bit to hacky by relying on the side effect. What is the best way to accomplish this?

like image 737
tir38 Avatar asked Apr 07 '19 01:04

tir38


1 Answers

I had nearly the same question as How to cancel individual network request in Retrofit with RxJava?. I can use a PublishSubject to "interrupt"

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
like image 100
tir38 Avatar answered Nov 11 '22 12:11

tir38