Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kotlin coroutines - use main thread in run blocking

I am trying to execute following code:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }

where jobs is the list of some Suppliers - in synchronus world this should just create, for example, list of ints. Everything works fine, but the problem is the main thread is not utilized. Bellow screenshot from YourKit: enter image description here

So, the question is - how can I utilize main thread also?

I suppose runBlocking is the problem here, but is there other way to receive the same result? With Java parallel stream it looks far more better, but the main thread is still not utilized entirely (tasks are totally independent).

UPDATE

Ok, maybe I have told you too few things. My questions came some time after watching Vankant Subramaniam presentation : https://youtu.be/0hQvWIdwnw4. I need maximum performance, there is no IO, no Ui etc. Only computations. There is only request and I need to use all my available resources.

One think which I have is to set paralleizm to thread count + 1, but I think it is rather silly.

like image 633
Witold Kupś Avatar asked Jul 20 '18 21:07

Witold Kupś


2 Answers

I tested the solution with Java 8 parallel streams:

jobs.parallelStream().forEach { it.execute() }

I found the CPU utilization to be reliably on 100%. For reference, I used this computation job:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(1_000_000)).forEach { _ ->
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}

Note that its duration varies randomly from zero up to the time it takes to perform 100,000,000 FP multiplications.

Out of curiosity I also studied the code you added to your question as the solution that works for you. I found a number of issues with it, such as:

  • accumulating all the results into a list instead of processing them as they become available
  • closing the result channel immediately after submitting the last job instead of waiting for all the results

I wrote some code of my own and added code to benchmark the Stream API one-liner against it. Here it is:

const val NUM_JOBS = 1000
val jobs = (0 until NUM_JOBS).map { MyJob() }


fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
        GlobalScope.actor<MyJob>(Dispatchers.Default) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    GlobalScope.launch {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { _ -> block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}

Here's my typical result:

Parallel Stream took 396.85 ms
Channels took 398.1 ms

The results are similar, but one line of code still beats 50 lines of code :)

like image 71
Marko Topolnik Avatar answered Oct 13 '22 22:10

Marko Topolnik


Just because there is no work being run on this explicit thread doesn't mean that device is not running other threads on the same core.

It's actually better to have your MainThread idle, that will make your UI more responsive.

like image 38
Pawel Avatar answered Oct 13 '22 22:10

Pawel