Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fan-out / fan-in - closing result channel

I'm producing items, consuming from multiple co-routines and pushing back to resultChannel. Producer is closing its channel after last item.

The code never finishes as resultChannel is never being closed. How to detect and properly finish iteration so hasNext() return false?

val inputData = (0..99).map { "Input$it" }
val threads = 10

val bundleProducer = produce<String>(CommonPool, threads) {
    inputData.forEach { item ->
        send(item)
        println("Producing: $item")
    }

    println("Producing finished")
    close()
}

val resultChannel = Channel<String>(threads)

repeat(threads) {
    launch(CommonPool) {
        bundleProducer.consumeEach {
            println("CONSUMING $it")
            resultChannel.send("Result ($it)")
        }
    }
}

val iterator = object : Iterator<String> {
    val iterator = resultChannel.iterator()
    override fun hasNext() = runBlocking { iterator.hasNext() }
    override fun next() = runBlocking { iterator.next() }
}.asSequence()

println("Starting interation...")

val result = iterator.toList()

println("finish: ${result.size}")
like image 400
atok Avatar asked Jul 31 '17 08:07

atok


1 Answers

You can run a coroutine that awaits for the consumers to finish and then closes the resultChannel.

First, rewrite the code that starts the consumers to save the Jobs:

val jobs = (1..threads).map {
    launch(CommonPool) {
        bundleProducer.consumeEach {
            println("CONSUMING $it")
            resultChannel.send("Result ($it)")
        }
    }
}

And then run another coroutine that closes the channel once all the Jobs are done:

launch(CommonPool) {
    jobs.forEach { it.join() }
    resultChannel.close()
}
like image 197
hotkey Avatar answered Nov 08 '22 17:11

hotkey