Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: calling unsubscribe from within onNext

Tags:

java

rx-java

I wonder whether it is legal to call unsubscribe from within onNext handler like that:

List<Integer> gatheredItems = new ArrayList<>();

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
    public void onNext(Integer item) {
        gatheredItems.add(item);
        if (item == 3) {
            unsubscribe();
        }
    }
    public void onCompleted() {
        // noop
    }
    public void onError(Throwable sourceError) {
        // noop
    }
};

Observable<Integer> source = Observable.range(0,100);

source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);

The above code correctly outputs that just four elements were gathered: [0, 1, 2, 3]. But if somebody changes the source observable to be cached:

Observable<Integer> source = Observable.range(0,100).cache();

Then all hundred elements are gathered. I don't have a control over source observable (whether it is cached or not), so how to definitely unsubscribe from within onNext?

BTW: So is unsubscribing within onNext wrong thing to do?

(My actual use case is that in onNext I'm actually writing to output stream, and when an IOException occurs nothing more could be written to the output so I need to somehow stop further processing.)

like image 702
Wojciech Gdela Avatar asked Jun 24 '15 12:06

Wojciech Gdela


People also ask

How do I unsubscribe from Observable RxJava?

You need to cancel your job properly via Observable::create and Observable::flatMap. And set your cancalable in Observable::create.

When the onNext () method of the subscriber is called?

After a Subscriber calls an Observable 's subscribe method, the Observable calls the Subscriber's Observer. onNext(T) method to emit items. A well-behaved Observable will call a Subscriber's Observer. onCompleted() method exactly once or the Subscriber's Observer.

What is onNext in RxJava?

OnNext. conveys an item that is emitted by the Observable to the observer. OnCompleted. indicates that the Observable has completed successfully and that it will be emitting no further items. OnError.

Is RxJava asynchronous?

RxJava is an implementation framework for reactive programming in Java. It is an event-based code base that provides powerful and elegant asynchronous calling programs.


1 Answers

The cache() caches everything once the first subscriber arrives and it doesn't give any options to stop it from downstream. You need to stop the stream before the cache() to avoid too much retention:

Observable<Integer> source = Observable.range(1, 100);

PublishSubject<Integer> stop = PublishSubject.create();

source
.doOnNext(v -> System.out.println("Generating " + v))
.takeUntil(stop)
.cache()
.doOnNext(new Action1<Integer>() {
    int calls;
    @Override
    public void call(Integer t) {
        System.out.println("Saving " + t);
        if (++calls == 3) {
            stop.onNext(1);
        }
    }
})
.subscribe();

Edit: the example above doesn't work below 1.0.13 so here is a version which should:

SerialSubscription ssub = new SerialSubscription();

ConnectableObservable<Integer> co = source
        .doOnNext(v -> System.out.println("Generating " + v))
        .replay();

co.doOnNext(v -> {
    System.out.println("Saving " + v);
    if (v == 3) {
        ssub.unsubscribe();
    }
})
.subscribe();

co.connect(v -> ssub.set(v));
like image 156
akarnokd Avatar answered Oct 21 '22 16:10

akarnokd