I'm writing a backend application in Kotlin.
To speed things up, I'm currently relying on RxKotlin on the server to do parallel execution of IO tasks such as database calls & API calls. The code usually looks like this.
val singleResult1 = Single.fromCallable{
database.get(....)
}.io()
val singleResult2 = Single.fromCallable{
database.update(....)
}.io()
Single.zip(singleResult1, singleResult2){ result1: Result1, result2: Result2 ->
....
}
.flatMap{
//other RX calls
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.blockingGet()
However, since don't work really work with multiple events (just singles), Rx feels a bit messy and just adds a bunch of boilerplate (it also causes complications if I want to return a null value and could sometimes mess up the stack trace)
I'm thinking of removing Rx and use Executors
(or threads) for parallelism insteada. Are there any performance considerations to consider here?
Example what I'm thinking of:
fun <T> waitAll(tasks: List<Callable<T>>, threadCount: Int = -1): List<T> {
val threads = if (threadCount == -1) tasks.size else threadCount
val executor = Executors.newFixedThreadPool(threads)
val results = executor.invokeAll(tasks).map {
it.get()
}
executor.shutdown()
return results
}
And using it like so:
waitAll(listOf(callable1, callable2))
Or maybe using regular threads and join them?
threads.forEach{
it.start()
}
threads.forEach{
it.join()
}
Or why not streams?
listOf(callable1,callable2)
.parallelStream()
.map{it.call()}
.collect(Collectors.toList())
Java Executor services uses Threads and RxKotlin uses ExecutorServices. So all of these are same in the background. The difference is software architecture. So if you choose the best architecture be integrated with your code it will work best and do the job correctly. Simple is the best.
If you have a event based or observable based architecture and you are trying to implementing a new library to work with event based operations or jobs you could write wrong steps about job separation or timing. Use RxKotlin and don't invent the wheel again.
If your work not about events or observable pattern and you just need to do parallel jobs just use the Executor services. The RxKotlin will be over engineering. When you use RxKotlin in this situation you need to do more thing then you need.
So i think the question is not the speed in this situation, the architecture is.
KotlinRx itself uses executors, except it has two pre-created thread pools under Schedulers.io()
(unbounded) and Schedulers.computation()
(bounded by the number of cores) and doesn't spin up a new one each time as your suggested code does. Which you obviously can do manually too:
private val executor = Executors.newCachedThreadPool() // equivalent to io()
fun <T> waitAll(tasks: List<Callable<T>>): List<T> {
return executor.invokeAll(tasks).map {
it.get()
}
}
This should be better than creating a thread for each task, generally speaking, by allowing to reuse existing threads, but depends on your usage.
coroutines is (IMO) more suitable for dealing with background/ui-thread communication (i.e Android etc)
Whether coroutines are useful for this very much depends on what you have inside your tasks. A blocking API (e.g. JDBC)? They aren't. An async one (e.g. Retrofit)? They are.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With