I've been working through the examples in the book Reactive Programming with RxJava, which is targeted at version 1 not 2. An introduction to infinite streams has the following example (and notes there are better ways to deal with the concurrency):
Observable<BigInteger> naturalNumbers = Observable.create(subscriber -> {
Runnabler = () -> {
BigInteger i = ZERO;
while (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
i = i.add(ONE);
}
};
new Thread(r).start();
});
...
Subscription subscription = naturalNumbers.subscribe(x -> log(x));
/* after some time... */
subscription.unsubscribe();
However, in RxJava 2, the lambda expression passed to the create()
method is of type ObservableEmitter
and this doesn't have an isUnsubscribed()
method. I've had a look in What's Different in 2.0 and also performed a search of the repository but can't find any such method.
How would this same functionality be achieved in 2.0?
Edited to include solution as given below (n.b. using kotlin):
val naturalNumbers = Observable.create<BigInteger> { emitter ->
Thread({
var int: BigInteger = BigInteger.ZERO
while (!emitter.isDisposed) {
emitter.onNext(int)
int = int.add(BigInteger.ONE)
}
}).start()
}
val first = naturalNumbers.subscribe { log("First: $it") }
val second = naturalNumbers.subscribe { log("Second: $it") }
Thread.sleep(5)
first.dispose()
Thread.sleep(5)
second.dispose()
After you subscribe to Observable, Disposable
is returned. You can save it to your local variable and check disposable.isDisposed()
to see if it still subscribing or not.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With