I am trying to execute following code:
val jobs = listOf(...)
return runBlocking(CommonPool) {
val executed = jobs.map {
async { it.execute() }
}.toTypedArray()
awaitAll(*executed)
}
where jobs
is the list of some Supplier
s - in synchronus world this should just create, for example, list of ints.
Everything works fine, but the problem is the main thread is not utilized. Bellow screenshot from YourKit:
I suppose runBlocking
is the problem here, but is there other way to receive the same result? With Java parallel stream it looks far more better, but the main thread is still not utilized entirely (tasks are totally independent).
Ok, maybe I have told you too few things. My questions came some time after watching Vankant Subramaniam presentation : https://youtu.be/0hQvWIdwnw4. I need maximum performance, there is no IO, no Ui etc. Only computations. There is only request and I need to use all my available resources.
One think which I have is to set paralleizm to thread count + 1, but I think it is rather silly.
I tested the solution with Java 8 parallel streams:
jobs.parallelStream().forEach { it.execute() }
I found the CPU utilization to be reliably on 100%. For reference, I used this computation job:
class MyJob {
fun execute(): Double {
val rnd = ThreadLocalRandom.current()
var d = 1.0
(1..rnd.nextInt(1_000_000)).forEach { _ ->
d *= 1 + rnd.nextDouble(0.0000001)
}
return d
}
}
Note that its duration varies randomly from zero up to the time it takes to perform 100,000,000 FP multiplications.
Out of curiosity I also studied the code you added to your question as the solution that works for you. I found a number of issues with it, such as:
I wrote some code of my own and added code to benchmark the Stream API one-liner against it. Here it is:
const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }
fun parallelStream(): Double =
jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })
fun channels(): Double {
val resultChannel = Channel<Double>(UNLIMITED)
val mainComputeChannel = Channel<MyJob>()
val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
GlobalScope.actor<MyJob>(Dispatchers.Default) {
for (job in channel) {
job.execute().also { resultChannel.send(it) }
}
}
}
val allComputeChannels = poolComputeChannels + mainComputeChannel
// Launch a coroutine that submits the jobs
GlobalScope.launch {
jobs.forEach { job ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(job) {}
}
}
}
}
// Run the main loop which takes turns between running a job
// submitted to the main thread channel and receiving a result
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_JOBS) {
select<Unit> {
mainComputeChannel.onReceive { job ->
job.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
sum
}
}
fun main(args: Array<String>) {
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
measure("Parallel Stream", ::parallelStream)
measure("Channels", ::channels)
}
fun measure(task: String, measuredCode: () -> Double) {
val block = { print(measuredCode().toString().substringBefore('.')) }
println("Warming up $task")
(1..20).forEach { _ -> block() }
println("\nMeasuring $task")
val average = (1..20).map { measureTimeMillis(block) }.average()
println("\n$task took $average ms")
}
Here's my typical result:
Parallel Stream took 396.85 ms
Channels took 398.1 ms
The results are similar, but one line of code still beats 50 lines of code :)
Just because there is no work being run on this explicit thread doesn't mean that device is not running other threads on the same core.
It's actually better to have your MainThread
idle, that will make your UI more responsive.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With