Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava 2 equivalent to isUnsubscribed

I've been working through the examples in the book Reactive Programming with RxJava, which is targeted at version 1 not 2. An introduction to infinite streams has the following example (and notes there are better ways to deal with the concurrency):

Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
    Runnabler = () -> {
        BigInteger i = ZERO;
        while (!subscriber.isUnsubscribed()) {
            subscriber.onNext(i);
            i = i.add(ONE);
        }
    };
    new Thread(r).start();
});

...

Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();

However, in RxJava 2, the lambda expression passed to the create() method is of type ObservableEmitter and this doesn't have an isUnsubscribed() method. I've had a look in What's Different in 2.0 and also performed a search of the repository but can't find any such method.

How would this same functionality be achieved in 2.0?

Edited to include solution as given below (n.b. using kotlin):

val naturalNumbers = Observable.create<BigInteger> { emitter ->
    Thread({
        var int: BigInteger = BigInteger.ZERO
        while (!emitter.isDisposed) {
            emitter.onNext(int)
            int = int.add(BigInteger.ONE)
        }
    }).start()
}

val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }

Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
like image 374
amb85 Avatar asked Jun 13 '17 15:06

amb85


1 Answers

After you subscribe to Observable, Disposable is returned. You can save it to your local variable and check disposable.isDisposed() to see if it still subscribing or not.

like image 61
Tuby Avatar answered Nov 13 '22 01:11

Tuby