I have a few hundred Java class instances that all need to complete their .calculate() method or die in 10 minutes. They will grab up the CPU and memory, so I'd like to only allow 5 (threads?) at once. I believe I'm close, but coming from a Java background I'm not yet familiar enough with kotlin coroutines (vs java ExecutorServices) to make this compile.
// ...my logic to create a stream of identical class type instances
// that all have a vanilla blocking .calculate():Double method...
// which I now want to (maybe?) map to Jobs
listOf(MyClass(1), MyClass(2), MyClass(1000))
.map {
launch(CommonPool) {
val errorRate: Double? = it?.calculate()
println("${it?.javaClass?.simpleName} $errorRate") // desired output
errorRate
}
}
.collect(Collectors.toList<Job>())
jobs.forEach {
println(it.join())
}
And then I think I need to wrap the calculate with a non-blocking calculate? Or a blocking, but timeout limited? Should that "runBlocking" be there? Better as a lambda in the above code?
fun MyClass.calculateTimeLimited(): Double = runBlocking {
withTimeout(TIMEOUT) {
this.calculate() // <-- doesn't compile! "this" is "CoroutineScope"
My take (sorry can't test it o my machine):
val results = streamOfInstances.asSeqence().map {
async(CommonPool) {
val errorRate: Double? = it?.calculate()
println("${it?.javaClass?.simpleName} $errorRate")
errorRate
}
}
runBlocking {
results.forEach {
println(it.await())
}
}
Key difference from your code:
async
instead of launch
since I am collecting the resultjoin()
or await()
) is inside runBlocking{}
map
, not the JDK8 APII don't know if you are aware but there is this great document: coroutines by example. I linked the specific section on cancellations and timeouts. Below is my implementation of your problem:
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.newSingleThreadContext
import java.util.*
import kotlin.system.measureNanoTime
internal val random = Random()
fun createRandArray(n: Long): Array<Int> {
return createRandTArray(n).toTypedArray()
}
fun createRandTArray(n: Long): IntArray {
return random.ints(n, 0, n.toInt()).toArray()
}
var size: Long = 1_000_000
var nRepeats: Int = 11
var nDrops: Int = 1
fun <T> benchmark(name: String,
buildSortable: (Long) -> T,
sort: (T) -> Any) {
val arrays = List(nRepeats) { buildSortable(size) }
val timesMS = arrays.map { measureNanoTime { sort(it) } / 1_000_000 }.drop(nDrops) // for JVM optimization warmup
// println(timesMS)
println("[$name] Average sort time for array of size $size: ${timesMS.average() } ms")
}
fun main(args: Array<String>) {
size = 1_000_000
nRepeats = 6
benchmark("Array<Int>",
buildSortable = { size -> createRandTArray(size).toTypedArray() },
sort = ::mergeSort)
benchmark("ListLike Array<Int>",
buildSortable = { size -> SortingArray(createRandTArray(size).toTypedArray()) },
sort = { array -> mergeSortLL(array) }) // oddly ::mergeSortLL is refused
benchmark("ListLike IntArray",
buildSortable = { size -> createRandTArray(size).asComparableList() },
sort = { array -> mergeSortLL(array) }) // oddly ::mergeSortLL is refused
benchmark("IntArray",
buildSortable = { size -> createRandTArray(size) },
sort = ::mergeSortIA)
benchmark("IntArray multi-thread (CommonPool) with many array copies",
buildSortable = { size -> createRandTArray(size) },
sort = { mergeSortCorou(it) })
val origContext = corouContext
corouContext = newSingleThreadContext("single thread")
benchmark("IntArray multi-thread (one thread!) with many array copies",
buildSortable = { size -> createRandTArray(size) },
sort = { mergeSortCorou(it) })
corouContext = origContext
benchmark("Java int[]",
buildSortable = { size -> createRandTArray(size) },
sort = { MergeSorter.sort(it) })
}
I got the output:
Hit the timeout (CancellationException).
Out of 100 computations, 45 completed.
You can play with the timeOut
value (currently 500 ms). Each job has a different random execution time from 0 to 1000 ms, so around half get executed before the time out.
However you might have a harder time to implement this for your specific problem. Your computations will have to be cancellable. Read the section on cancellation carefully in the document I linked above. Basically your computation either has to call one of the suspend
functions in kotlinx.coroutines
(this is what I did since I call delay
), or use yield
or isActive
.
EDIT: related to comment on cancelling any job (non-cancellable/non-suspendable):
No, there is no magic here. Making a computation truly cancellable is very difficult, no matter what framework you use. Notoriously Java has Thread.stop()
, which seems to do what you would want, but was deprecated.
I tried to use coroutines to solve the simpler problem of stopping submitting new jobs after the timeout, but where jobs which started before the timeout can run far beyond the timeout without being cancelled/interrupted. I spent some time on it and could not find an easy solution with coroutines. It could be done using the standard Java concurrency constructs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With