Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In RxJava, I can't emit a onComplete from a flatMap

Tags:

rx-java

I have the problem that my Observable chains don't terminate when I use a flatmap.

I've boiled down my example to this:

int count = Observable.just(1,2,3)
        .flatMap(s -> Observable.<Integer>create(subscr-> {
            subscr.onNext(s);
            if(s>2) {
                subscr.onCompleted();
            }
        }))
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
    System.err.println("Count: "+count);

I'd expect the 'doOnEach' to report three onNext events, followed by a onCompleted, and finally the chain should terminate.

However, the output is this:

OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]

(and then in keeps hanging)

If I remove the flatMap operator:

int count = Observable.just(1,2,3)
        .doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
        .count()
        .toBlocking()
        .first();
System.err.println("Count: "+count);

... it works exactly as expected:

CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3

I guess I am doing something wrong (I can't imagine a bug in such a basic scenario), but I don't see it.

Any help is appreciated... (I'm using RxJava 1.1.0)

like image 630
Frank Lee Avatar asked Feb 08 '23 01:02

Frank Lee


1 Answers

The issue is in the following lines:

        if(s>2) {
            subscr.onCompleted();
        }

Observables of s == 1 and s == 2 won't emit onCompleted. As flatMap only emits onCompleted when all Observables are completed, it won't emit onCompleted in your case.

like image 140
zsxwing Avatar answered Mar 07 '23 01:03

zsxwing