Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin: How to async await a list of identical methods?

Tags:

kotlin

I have a few hundred Java class instances that all need to complete their .calculate() method or die in 10 minutes. They will grab up the CPU and memory, so I'd like to only allow 5 (threads?) at once. I believe I'm close, but coming from a Java background I'm not yet familiar enough with kotlin coroutines (vs java ExecutorServices) to make this compile.

// ...my logic to create a stream of identical class type instances 
// that all have a vanilla blocking .calculate():Double method...
// which I now want to (maybe?) map to Jobs

listOf(MyClass(1), MyClass(2), MyClass(1000))
.map {
  launch(CommonPool) {
    val errorRate: Double? = it?.calculate()
    println("${it?.javaClass?.simpleName}  $errorRate") // desired output
    errorRate
  }
}
.collect(Collectors.toList<Job>())

jobs.forEach {
    println(it.join())
}

And then I think I need to wrap the calculate with a non-blocking calculate? Or a blocking, but timeout limited? Should that "runBlocking" be there? Better as a lambda in the above code?

fun MyClass.calculateTimeLimited(): Double = runBlocking {
  withTimeout(TIMEOUT) {
    this.calculate() // <-- doesn't compile! "this" is "CoroutineScope"
like image 719
Benjamin H Avatar asked Aug 15 '17 20:08

Benjamin H


2 Answers

My take (sorry can't test it o my machine):

val results = streamOfInstances.asSeqence().map {
    async(CommonPool) {
        val errorRate: Double? = it?.calculate()
        println("${it?.javaClass?.simpleName}  $errorRate")
        errorRate
    }
}

runBlocking {
    results.forEach {
        println(it.await())
    }
}  

Key difference from your code:

  • I use async instead of launch since I am collecting the result
  • The blocking operation (join() or await()) is inside runBlocking{}
  • I use Kotlin's map, not the JDK8 API
like image 174
voddan Avatar answered Oct 05 '22 22:10

voddan


I don't know if you are aware but there is this great document: coroutines by example. I linked the specific section on cancellations and timeouts. Below is my implementation of your problem:

import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.newSingleThreadContext
import java.util.*
import kotlin.system.measureNanoTime

internal val random = Random()

fun createRandArray(n: Long): Array<Int> {
    return createRandTArray(n).toTypedArray()
}

fun createRandTArray(n: Long): IntArray {
    return random.ints(n, 0, n.toInt()).toArray()
}

var size: Long = 1_000_000
var nRepeats: Int = 11
var nDrops: Int = 1

fun <T> benchmark(name: String,
                  buildSortable: (Long) -> T,
                  sort: (T) -> Any) {
    val arrays = List(nRepeats) { buildSortable(size) }
    val timesMS = arrays.map { measureNanoTime { sort(it) } / 1_000_000 }.drop(nDrops) // for JVM optimization warmup
    // println(timesMS)
    println("[$name] Average sort time for array of size $size: ${timesMS.average() } ms")
}

fun main(args: Array<String>) {
    size = 1_000_000
    nRepeats = 6

    benchmark("Array<Int>",
            buildSortable = { size -> createRandTArray(size).toTypedArray() },
            sort = ::mergeSort)

    benchmark("ListLike Array<Int>",
            buildSortable = { size -> SortingArray(createRandTArray(size).toTypedArray()) },
            sort = { array -> mergeSortLL(array) })  // oddly ::mergeSortLL is refused

    benchmark("ListLike IntArray",
            buildSortable = { size -> createRandTArray(size).asComparableList() },
            sort = { array -> mergeSortLL(array) })  // oddly ::mergeSortLL is refused

    benchmark("IntArray",
            buildSortable = { size -> createRandTArray(size) },
            sort = ::mergeSortIA)

    benchmark("IntArray multi-thread (CommonPool) with many array copies",
            buildSortable = { size -> createRandTArray(size) },
            sort = { mergeSortCorou(it) })

    val origContext = corouContext
    corouContext = newSingleThreadContext("single thread")
    benchmark("IntArray multi-thread (one thread!) with many array copies",
            buildSortable = { size -> createRandTArray(size) },
            sort = { mergeSortCorou(it) })
    corouContext = origContext

    benchmark("Java int[]",
            buildSortable = { size -> createRandTArray(size) },
            sort = { MergeSorter.sort(it) })
}

I got the output:

 Hit the timeout (CancellationException).
 Out of 100 computations, 45 completed.

You can play with the timeOut value (currently 500 ms). Each job has a different random execution time from 0 to 1000 ms, so around half get executed before the time out.

However you might have a harder time to implement this for your specific problem. Your computations will have to be cancellable. Read the section on cancellation carefully in the document I linked above. Basically your computation either has to call one of the suspend functions in kotlinx.coroutines (this is what I did since I call delay), or use yield or isActive.


EDIT: related to comment on cancelling any job (non-cancellable/non-suspendable):

No, there is no magic here. Making a computation truly cancellable is very difficult, no matter what framework you use. Notoriously Java has Thread.stop(), which seems to do what you would want, but was deprecated.

I tried to use coroutines to solve the simpler problem of stopping submitting new jobs after the timeout, but where jobs which started before the timeout can run far beyond the timeout without being cancelled/interrupted. I spent some time on it and could not find an easy solution with coroutines. It could be done using the standard Java concurrency constructs.

like image 43
toto2 Avatar answered Oct 05 '22 23:10

toto2