Could you explain why I am getting strange output when I change schedulers of the returned Observable in flatMap? For example, I have
Observable.range(1, 9)
.flatMap {
if (it < 5) {
Observable.just(it)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
} else {
Observable.just(it)
}
}
.subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)
As output I have different results on each run. E.g. The first launch
1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6
The second launch outputs this:
5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8
flatMap
merges on one of the participating thread non-deterministically, thus even if the inner sources have subscribeOn
and/or observeOn
defined, there is no guarantee which thread will win at a particular moment to emit items from the sources. Therefore, you have to apply observeOn
after flatMap
if you want to ensure subsequent event processing happens on the desired thread (until there is another async boundary operator).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With