Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjava2 - simple example of executing tasks on a thread pool, subscribing on a single thread

I'm experimenting with the following task to get my head around RxJava:

  • Given a list of URLs
  • Do a HTTP request for each URL on a thread pool
  • For each result insert some data into an SQLite database (no multithreading here)
  • Block the method until it completes

So I tried it out in Kotlin:

val ex = Executors.newFixedThreadPool(10)
Observable.fromIterable((1..100).toList())
    .observeOn(Schedulers.from(ex))
    .map { Thread.currentThread().name }
    .subscribe { println(it + " " + Thread.currentThread().name }

I expected it to print

pool-1-thread-1 main
pool-1-thread-2 main
pool-1-thread-3 main
pool-1-thread-4 main
....

However it prints:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1
pool-1-thread-1 pool-1-thread-1

Can anyone correct my misunderstandings about how this works? Why does it not use all of the threads of the thread pool? How can I get my subscriber to run on the main thread or block until completion?

like image 385
bcoughlan Avatar asked Mar 08 '23 04:03

bcoughlan


2 Answers

Rx is not meant as a parallel execution service, use Java's streams api for that. Rx events are synchronous, and will flow through the stream subsequently. When building the stream, observeOn will request a thread once and process the emissions one by one on that thread.

You also expected subscribe to be executed on the main thread. observeOn switches the threads and all downstream events happen on that thread. If you want to switch to the main thread, you will have to insert another observeOn just before subscribe.

like image 57
nhaarman Avatar answered Mar 10 '23 18:03

nhaarman


To make code inside your map block work in parallel you should wrap it to observable with own scheduler:

val ex = Executors.newFixedThreadPool(10)
    val scheduler = Schedulers.from(ex)
    Observable.fromIterable((1..100).toList())
            .flatMap {
                Observable
                        .fromCallable { Thread.currentThread().name }
                        .subscribeOn(scheduler)
            }
            .subscribe { println(it + " " + Thread.currentThread().name) }

In this case, you will see the result:

pool-1-thread-1 pool-1-thread-1
pool-1-thread-2 pool-1-thread-1
pool-1-thread-3 pool-1-thread-1
pool-1-thread-4 pool-1-thread-1
...

You can check article RxJava - Achieving Parallelization that gives explanations of this behavior.

Also, RxJava 2.0.5 introduced ParallelFlowable API

like image 42
gildor Avatar answered Mar 10 '23 17:03

gildor