I have the problem that my Observable chains don't terminate when I use a flatmap.
I've boiled down my example to this:
int count = Observable.just(1,2,3)
.flatMap(s -> Observable.<Integer>create(subscr-> {
subscr.onNext(s);
if(s>2) {
subscr.onCompleted();
}
}))
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
I'd expect the 'doOnEach' to report three onNext events, followed by a onCompleted, and finally the chain should terminate.
However, the output is this:
OnNext -> [rx.Notification@c9e6d24d OnNext 1]
OnNext -> [rx.Notification@c9e6d24e OnNext 2]
OnNext -> [rx.Notification@c9e6d24f OnNext 3]
(and then in keeps hanging)
If I remove the flatMap operator:
int count = Observable.just(1,2,3)
.doOnEach(a->System.err.println(a.getKind()+" -> "+a.toString()))
.count()
.toBlocking()
.first();
System.err.println("Count: "+count);
... it works exactly as expected:
CREATED!
OnNext -> [rx.Notification@e3598bd9 OnNext 1]
OnNext -> [rx.Notification@e3598bda OnNext 2]
OnNext -> [rx.Notification@e3598bdb OnNext 3]
OnCompleted -> [rx.Notification@3834d63f OnCompleted]
Count: 3
I guess I am doing something wrong (I can't imagine a bug in such a basic scenario), but I don't see it.
Any help is appreciated... (I'm using RxJava 1.1.0)
The issue is in the following lines:
if(s>2) {
subscr.onCompleted();
}
Observable
s of s == 1
and s == 2
won't emit onCompleted
. As flatMap
only emits onCompleted
when all Observable
s are completed, it won't emit onCompleted
in your case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With