I have the problem that my Observable chains don't terminate when I use a flatmap.
I've boiled down my example to this:
int count = Observable.just(1,2,3)
.flatMap(s -> Observable.<Integer>create(subscr-> {
subscr.onNext(s);
if(s>2) {
subscr.onCompleted();
}
}))
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
I'd expect the 'doOnEach' to report three onNext events, followed by a onCompleted, and finally the chain should terminate.
However, the output is this:
OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]
(and then in keeps hanging)
If I remove the flatMap operator:
int count = Observable.just(1,2,3)
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
... it works exactly as expected:
CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3
I guess I am doing something wrong (I can't imagine a bug in such a basic scenario), but I don't see it.
Any help is appreciated... (I'm using RxJava 1.1.0)
The issue is in the following lines:
if(s>2) {
subscr.onCompleted();
}
Observables of s == 1 and s == 2 won't emit onCompleted. As flatMap only emits onCompleted when all Observables are completed, it won't emit onCompleted in your case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With