Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Behavior of flatMap with different Schedulers in it

Tags:

rx-java2

Could you explain why I am getting strange output when I change schedulers of the returned Observable in flatMap? For example, I have

Observable.range(1, 9)
        .flatMap {
            if (it < 5) {
                Observable.just(it)
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
            } else {
                Observable.just(it)
            }
        }
        .subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)

As output I have different results on each run. E.g. The first launch

1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6

The second launch outputs this:

5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8
like image 249
Buckstabue Avatar asked Mar 09 '23 05:03

Buckstabue


1 Answers

flatMap merges on one of the participating thread non-deterministically, thus even if the inner sources have subscribeOn and/or observeOn defined, there is no guarantee which thread will win at a particular moment to emit items from the sources. Therefore, you have to apply observeOn after flatMap if you want to ensure subsequent event processing happens on the desired thread (until there is another async boundary operator).

like image 184
akarnokd Avatar answered Apr 25 '23 11:04

akarnokd