I wonder whether it is legal to call unsubscribe
from within onNext
handler like that:
List<Integer> gatheredItems = new ArrayList<>();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
public void onNext(Integer item) {
gatheredItems.add(item);
if (item == 3) {
unsubscribe();
}
}
public void onCompleted() {
// noop
}
public void onError(Throwable sourceError) {
// noop
}
};
Observable<Integer> source = Observable.range(0,100);
source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
The above code correctly outputs that just four elements were gathered: [0, 1, 2, 3]
. But if somebody changes the source observable to be cached:
Observable<Integer> source = Observable.range(0,100).cache();
Then all hundred elements are gathered. I don't have a control over source observable (whether it is cached or not), so how to definitely unsubscribe from within onNext
?
BTW: So is unsubscribing within onNext
wrong thing to do?
(My actual use case is that in onNext
I'm actually writing to output stream, and when an IOException
occurs nothing more could be written to the output so I need to somehow stop further processing.)
You need to cancel your job properly via Observable::create and Observable::flatMap. And set your cancalable in Observable::create.
After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.
OnNext. conveys an item that is emitted by the Observable to the observer. OnCompleted. indicates that the Observable has completed successfully and that it will be emitting no further items. OnError.
RxJava is an implementation framework for reactive programming in Java. It is an event-based code base that provides powerful and elegant asynchronous calling programs.
The cache() caches everything once the first subscriber arrives and it doesn't give any options to stop it from downstream. You need to stop the stream before the cache() to avoid too much retention:
Observable<Integer> source = Observable.range(1, 100);
PublishSubject<Integer> stop = PublishSubject.create();
source
.doOnNext(v -> System.out.println("Generating " + v))
.takeUntil(stop)
.cache()
.doOnNext(new Action1<Integer>() {
int calls;
@Override
public void call(Integer t) {
System.out.println("Saving " + t);
if (++calls == 3) {
stop.onNext(1);
}
}
})
.subscribe();
Edit: the example above doesn't work below 1.0.13 so here is a version which should:
SerialSubscription ssub = new SerialSubscription();
ConnectableObservable<Integer> co = source
.doOnNext(v -> System.out.println("Generating " + v))
.replay();
co.doOnNext(v -> {
System.out.println("Saving " + v);
if (v == 3) {
ssub.unsubscribe();
}
})
.subscribe();
co.connect(v -> ssub.set(v));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With