Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?

I'm trying to execute certain suspend function multiple times, in such a way that never more than N of these are being executed at the same time.

For those acquainted with Akka and Scala Streaming libraries, something like mapAsync.

I did my own implementation using one input channel (as in kotlin channels) and N output channels. But it seems cumbersome and not very efficient.

The code I'm currently using is somewhat like this:

val inChannel = Channel<T>()
val outChannels = (0..n).map{
  Channel<T>()
}
launch{
   var i = 0
   for(t in inChannel){
     
     outChannels[i].offer(t)
     i = ((i+1)%n)
   }
}
outChannels.forEach{outChannel ->
  launch{
     for(t in outChannel){
        fn(t)
     }
  }
}

Of course it has error management and everything, but still...

Edit: I did the following test, and it failed.

test("Parallelism is correctly capped") {
            val scope = CoroutineScope(Dispatchers.Default.limitedParallelism(3))
            var num = 0
            (1..100).map {
                scope.launch {
                    num ++
                    println("started $it")
                    delay(Long.MAX_VALUE)
                }
            }

            delay(500)
            assertEquals(3,num)

        }
like image 642
Alejandro Navas Avatar asked Jan 31 '26 02:01

Alejandro Navas


1 Answers

Your question, as asked, calls for @marstran's answer. If what you want is that no more than N coroutines are being actively executed at any given time (in parallel), then limitedParallelism is the way to go:

val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)

elements.forEach { elt ->
    scope.launch(limitedDispatcher) {
        doSomething(elt)
    }
}

Now, if what you want is to even limit concurrency, so that at most N coroutines are run concurrently (potentially interlacing), regardless of threads, you could use a Semaphore instead:

val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)

elements.forEach { elt ->
    scope.async {
        semaphore.withPermit {
            doSomething(elt)
        }
    }
}

You can also combine both approaches.

like image 87
Joffrey Avatar answered Feb 03 '26 00:02

Joffrey



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!