Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to join a Kotlin SupervisorJob

I am trying to process a tree of data objects. Each tree leaf is supposed to be processed through a function using a coroutine. The whole process should be done using a fixed size threadpool.

So I came up with this:

val node = an instance of WorkspaceEntry (tree structure)
val localDispatcher = newFixedThreadPoolContext(16)

fun main() {
    val job = SupervisorJob()
    val scope = CoroutineScope(localDispatcher + job)
    handleEntry(node, scope)

    runBlocking {
        job.join()
    }
}

The handleEntry method recursively launches a child job in the supervisor for each tree leaf.

The child jobs of the supervisor all complete successfully, but the join never returns. Am I understanding this wrong?

Edit: HandleEntry function

private fun handleEntry(workspaceEntry: WorkspaceEntry, scope: CoroutineScope) {
    if (workspaceEntry is FileEntry) {
        scope.launch {
            FileTypeRegistry.processFile(workspaceEntry.fileBlob)
        }
    } else {
        workspaceEntry.children.forEach { child -> handleEntry(child, scope) }
    }
}
like image 476
J Horseman Avatar asked Nov 08 '25 10:11

J Horseman


2 Answers

It seems the Job that is used to create CoroutineContext (in your case SupervisorJob) is not intended for waiting child coroutines to finish, so you can't use job.join(). I guess the main intent of that Job is to cancel child coroutines. Changing runBlocking block to the following will work:

runBlocking {
    job.children.forEach {
        it.join()
    }
}
like image 180
Sergey Avatar answered Nov 11 '25 17:11

Sergey


You have mixed two roles:

  1. the master job found in the coroutine scope that never completes on its own and is used to control the lifecycle of everything else
  2. the job corresponding to a unit of work, possibly decomposed into more child jobs

You need both, like this:

val masterJob = SupervisorJob()
val scope = CoroutineScope(localDispatcher + masterJob)

val unitOfWork = scope.launch { handleEntry(node, scope) }
runBlocking { unitOfWork.join() }

The above code doesn't really motivate the existence of the master job because you start just one child job from it, but it may make sense in a wider picture, where you have some context from which you launch many jobs, and want to be able to write

masterJob.cancel()

to cancel everything before it's done.

like image 22
Marko Topolnik Avatar answered Nov 11 '25 17:11

Marko Topolnik



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!