Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bridge channel to a sequence

This code is based on Coroutines guide example: Fan-out

val inputProducer = produce<String>(CommonPool) {
    (0..inputArray.size).forEach {
        send(inputArray[it])
    }
}

val resultChannel = Channel<Result>(10)

repeat(threadCount) {
    launch(CommonPool) {
        inputProducer.consumeEach {
            resultChannel.send(getResultFromData(it))
        }
    }
}

What is the right way to create a Sequence<Result> that will provide results?

like image 706
atok Avatar asked Jul 28 '17 08:07

atok


1 Answers

You can get the channel .iterator() from the ReceiveChannel and then wrap that channel iterator into a Sequence<T>, implementing its normal Iterator<T> that blocks waiting for the result on each request:

fun <T> ReceiveChannel<T>.asSequence(context: CoroutineContext) =
    Sequence {
        val iterator = iterator()
        object : AbstractIterator<T>() {
            override fun computeNext() = runBlocking(context) {
                if (!iterator.hasNext())
                    done() else
                    setNext(iterator.next())
            }
        }
    }

val resultSequence = resultChannel.asSequence(CommonPool)
like image 184
hotkey Avatar answered Oct 02 '22 11:10

hotkey