I'm experimenting with the following task to get my head around RxJava:
So I tried it out in Kotlin:
val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
.observeOn(Schedulers.from(ex))
.map { Thread.currentThread().name }
.subscribe { println(it + " " + Thread.currentThread().name }
I expected it to print
pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....
However it prints:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
Can anyone correct my misunderstandings about how this works? Why does it not use all of the threads of the thread pool? How can I get my subscriber to run on the main thread or block until completion?
Rx is not meant as a parallel execution service, use Java's streams api for that. Rx events are synchronous, and will flow through the stream subsequently. When building the stream, observeOn will request a thread once and process the emissions one by one on that thread.
You also expected subscribe
to be executed on the main thread. observeOn
switches the threads and all downstream events happen on that thread. If you want to switch to the main thread, you will have to insert another observeOn
just before subscribe
.
To make code inside your map
block work in parallel you should wrap it to observable with own scheduler:
val ex = Executors.newFixedThreadPool(10)
val scheduler = Schedulers.from(ex)
Observable.fromIterable((1..100).toList())
.flatMap {
Observable
.fromCallable { Thread.currentThread().name }
.subscribeOn(scheduler)
}
.subscribe { println(it + " " + Thread.currentThread().name) }
In this case, you will see the result:
pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...
You can check article RxJava - Achieving Parallelization that gives explanations of this behavior.
Also, RxJava 2.0.5 introduced ParallelFlowable API
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With