Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to implement a limited call with retrofit blocking client and coroutines

I have the following code:

val context = newFixedThreadPoolContext(nThreads = 10, name="myThreadPool")
val total = 1_000_000 //can be other number as well
val maxLimit = 1_000
return runBlocking {
  (0..total step maxLimit).map {
    async(context) {
      val offset = it
      val limit = it + maxLimit
      blockingHttpCall(offset, limit)
    }
  }.flatMap {
    it.await()
  }.associateBy {
    ...
  }.toMutableMap()
}

I would like only 10 calls to happen simultaneously to the blocking api. However, it seems the above code is not doing it as I expect (I think all calls start immediately), or at least I don't understand if it does.
What is the correct way to implement it? Will the same solution work if I use async api of retrofit?

like image 915
oshai Avatar asked Sep 24 '17 18:09

oshai


1 Answers

I don't know exactly your case, but easiest way - use OkHttp API to configure concurrency level, for example, this is default concurrency strategy of OkHttp

But you can have own strategy if you set own Dispatcher instance to OkHttpClient.Builder

Of course, you can use also coroutines

Your current implementation is incorrect because you create coroutines dispatcher for each item, but to have shared pool of threads all the coroutines should use the same dispatcher, just move newFixedThreadPoolContext creation outside of the loop (now you have 1000 dispatchers each with 10 threads).

But I don't recommend you to use coroutines + blocking calls, better to configure OkHttp concurrency (it's more flexible) and use coroutines with non-blocking calls (you can write own adapter or use an existing library like kotlin-coroutines-retrofit). It will allow you to mix your http requests and UI code or other tasks.

So if you use non-blocking API + OkHttp internal concurrency, you don't need to have special code to control concurrency, of course, you can limit the number of concurrent calls like in your example above (with fixed dispatcher construction), but I really don't think that it makes much sense, because you can decrease concurrency level, not increase it.

After moving to non-blocking API you can just run all your coroutines in any coroutines dispatcher in parallel (even in UI thread) and wait for results without blocking.

Also, implicit control of concurrency using OkHttpClient configuration looks as a more right way in terms of architecture (you can have DI code that configures Retrofit + OkHttp and provides it to your client code with preconfigured concurrency policy). Of course, you can achieve that using other approaches, but this one looks more natural for me.

like image 158
gildor Avatar answered Nov 20 '22 03:11

gildor