I have the following code:
val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool")
val total = 1_000_000 //can be other number as well
val maxLimit = 1_000
return runBlocking {
(0..total step maxLimit).map {
async(context) {
val offset = it
val limit = it + maxLimit
blockingHttpCall(offset, limit)
}
}.flatMap {
it.await()
}.associateBy {
...
}.toMutableMap()
}
I would like only 10 calls to happen simultaneously to the blocking api.
However, it seems the above code is not doing it as I expect (I think all calls start immediately), or at least I don't understand if it does.
What is the correct way to implement it?
Will the same solution work if I use async api of retrofit?
I don't know exactly your case, but easiest way - use OkHttp API to configure concurrency level, for example, this is default concurrency strategy of OkHttp
But you can have own strategy if you set own Dispatcher
instance to OkHttpClient.Builder
Of course, you can use also coroutines
Your current implementation is incorrect because you create coroutines dispatcher for each item, but to have shared pool of threads all the coroutines should use the same dispatcher, just move newFixedThreadPoolContext
creation outside of the loop (now you have 1000 dispatchers each with 10 threads).
But I don't recommend you to use coroutines + blocking calls, better to configure OkHttp concurrency (it's more flexible) and use coroutines with non-blocking calls (you can write own adapter or use an existing library like kotlin-coroutines-retrofit). It will allow you to mix your http requests and UI code or other tasks.
So if you use non-blocking API + OkHttp internal concurrency, you don't need to have special code to control concurrency, of course, you can limit the number of concurrent calls like in your example above (with fixed dispatcher construction), but I really don't think that it makes much sense, because you can decrease concurrency level, not increase it.
After moving to non-blocking API you can just run all your coroutines in any coroutines dispatcher in parallel (even in UI thread) and wait for results without blocking.
Also, implicit control of concurrency using OkHttpClient configuration looks as a more right way in terms of architecture (you can have DI code that configures Retrofit + OkHttp and provides it to your client code with preconfigured concurrency policy). Of course, you can achieve that using other approaches, but this one looks more natural for me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With