Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you do a parallel fold on a Kotlin sequence?

As an exercise to help me learn Kotlin, I am writing an application in which I have a large sequence of objects, and I want to find the "best" one according to a set of criteria, for which the obvious way is to use a fold.

I have done the same thing in other languages, notably Java and Haskell, and in both of those I can achieve a big improvement in performance by doing a parallel fold - that is, splitting the sequence into chunks, folding each chunk separately and in parallel, and then combining the results.

In Java (assuming that objs is a Stream containing the objects I want to fold), this is as simple as:

objs.parallel().reduce(this::findBetter);

and the chunking and parallel folds are then handled for me.

I can't work out how to do this in Kotlin. I know that splitting the sequence into a sequence of chunks is easy, using the chunked method, and I can then convert that sequence to a flow of chunks, and I then thought that the next stage would be to map each chunk in the flow to the result of asynchronously folding the chunk; then I could fold the flow of intermediate results to get my final result. But everything I've tried seems to fail to compile at some point.

Is this a good approach, and if so how could it be coded - in particular the bit about asynchronously folding the chunk?

like image 706
Jeremy Hicks Avatar asked Oct 11 '25 23:10

Jeremy Hicks


1 Answers

This question is actually more tricky than it seems. It sounds like a simple request for a feature which should be just there and just works. However, both sequences and flows are by design sequential and ordered. Concurrent processing isn't their goal and they don't support such feature. I believe there are plans to provide it for flows, but we are not there yet.

Also, we need to be aware adding such feature is not that straightforward as there is not a single way to do this. If the sequence/flow is ordered, should we entirely ignore ordering for parallel processing or do we need to provide some ordering guarantees? How should we split the work between workers? How to schedule concurrent tasks: threads, executors, coroutines? What's the expected level of concurrency? Etc.

Let's assume we would like to use coroutines, we ignore ordering entirely and we chunk the data as you said. The simplest solution would be based on collections - as you said, we need to chunk the data, process each chunk asynchronously, then join:

suspend fun <T> Iterable<T>.parallelReduce(chunkSize: Int, operation: (T, T) -> T): T = coroutineScope {
    chunked(chunkSize)
        .map { async { it.reduce(operation) } }
        .awaitAll()
        .reduce(operation)
}

We can use this with a sequence by: seq.asIterable().parallelReduce(...). Of course, this solution consumes the whole sequence at an initial step. If the sequence is very large and we can't keep all items in the memory, we need another solution.

We can convert the sequence to a flow. Flow is pretty similar to a sequence, but it allows to use coroutines in its operators and also, it provides a way to split the pipeline into two concurrent pipelines by placing a buffer() in the middle:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    require(concurrency >= 2)
    chunked(chunkSize).asFlow()
        .map { async { it.reduce(operation) } }
        .buffer(concurrency - 2) // buffer of 0 already provides concurrency of 2
        .map { it.await() }
        .reduce(operation)
}

Essentially, it works in a similar way to the first example, but instead of scheduling all chunks straight away, it launches only a limited number of tasks at a time. Please be aware this solution still consumes results sequentially, not concurrently. That means if the first chunk takes much longer than others, then after subsequent chunks are finished, we won't consume their results and won't start another chunk until the first chunk unblocks the whole pipeline.

Also, it feels a little strange we use flows like this. They were designed more for inter-components communication, not as a concurrency tool. It feels strange we launch coroutines in the flow.map(). Alternatively, we can do it entirely manually by using a channel and launching worker coroutines:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    val inputsChannel = produce { chunked(chunkSize).forEach { send(it) } }
    val resultsChannel = Channel<T>()
    val result = async {
        var acc = resultsChannel.receive()
        for (item in resultsChannel) {
            acc = operation(acc, item)
        }
        acc
    }
    coroutineScope {
        repeat(concurrency) {
            launch {
                for (inputs in inputsChannel) {
                    resultsChannel.send(inputs.reduce(operation))
                }
            }
        }
    }
    resultsChannel.close()
    result.await()
}

This solution both processes and consumes results concurrently, so it doesn't have the drawback mentioned above.

As a bonus: what I said about flows not supporting concurrent processing is not entirely true. There is one operator which actually provides concurrent processing: flatMapMerge(). It feels it doesn't fit the whole API very well, like it was added by accident. Anyway, it allows to use concurrent processing very easily:

suspend fun <T> Sequence<T>.parallelReduce(chunkSize: Int, concurrency: Int, operation: (T, T) -> T): T = coroutineScope {
    chunked(chunkSize).asFlow()
        .flatMapMerge(concurrency) {
            flow { emit(it.reduce(operation)) }
        }
        .reduce(operation)
}

I consider this to be a trick of some sort. It is not very clear what this code does and especially: how it does it. But it works, it requires minimal code and I believe it schedules processing optimally.

like image 137
broot Avatar answered Oct 15 '25 20:10

broot