I am trying to process a tree of data objects. Each tree leaf is supposed to be processed through a function using a coroutine. The whole process should be done using a fixed size threadpool.
So I came up with this:
val node = an instance of WorkspaceEntry (tree structure)
val localDispatcher = newFixedThreadPoolContext(16)
fun main() {
val job = SupervisorJob()
val scope = CoroutineScope(localDispatcher + job)
handleEntry(node, scope)
runBlocking {
job.join()
}
}
The handleEntry method recursively launches a child job in the supervisor for each tree leaf.
The child jobs of the supervisor all complete successfully, but the join never returns. Am I understanding this wrong?
Edit: HandleEntry function
private fun handleEntry(workspaceEntry: WorkspaceEntry, scope: CoroutineScope) {
if (workspaceEntry is FileEntry) {
scope.launch {
FileTypeRegistry.processFile(workspaceEntry.fileBlob)
}
} else {
workspaceEntry.children.forEach { child -> handleEntry(child, scope) }
}
}
It seems the Job that is used to create CoroutineContext (in your case SupervisorJob) is not intended for waiting child coroutines to finish, so you can't use job.join(). I guess the main intent of that Job is to cancel child coroutines. Changing runBlocking block to the following will work:
runBlocking {
job.children.forEach {
it.join()
}
}
You have mixed two roles:
You need both, like this:
val masterJob = SupervisorJob()
val scope = CoroutineScope(localDispatcher + masterJob)
val unitOfWork = scope.launch { handleEntry(node, scope) }
runBlocking { unitOfWork.join() }
The above code doesn't really motivate the existence of the master job because you start just one child job from it, but it may make sense in a wider picture, where you have some context from which you launch many jobs, and want to be able to write
masterJob.cancel()
to cancel everything before it's done.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With