Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin - How to run n coroutines and wait for first m results or timeout?

I'm attempting to write a function that will start n coroutines and wait until the first m to complete. Should m coroutines fail to complete within some timeout, then all coroutines/jobs are canceled. My initial implementation for this is shown below, however I feel it can be improved. My initial thought was to use a parent job to run all other jobs under so the parent job can be cancelled and cascade down to the remaining children. However, this results in a TimeoutCancellationException that has to be caught.

How do I write a function to start n coroutines and wait until the first m to complete, or a timeout to occur before m coroutines can complete?

private suspend fun queryAllHosts(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , maxSuccessfulHosts: Int
        , queryTimeout: Long
        , requestTimeout: Long
): ArrayList<QueryResult<ResultModel>> {
    val results = ArrayList<QueryResult<ResultModel>>()
    val rootJob = Job()

    try {
        withTimeout(queryTimeout, TimeUnit.MILLISECONDS) {
            queryFactories.map {
                async(parent = rootJob) {
                    val pagedResult = queryHost(
                            it
                            , query
                            , pageIndex
                            , requestTimeout
                    )

                    if (pagedResult.isSuccessful()) {
                        results.add(pagedResult)
                    }

                    if (results.size == maxSuccessfulHosts) {
                        rootJob.cancelAndJoin()
                        return@async
                    }
                }
            }.awaitAll()
        }
    } catch (ex: TimeoutCancellationException) {
        Log.w(Tag, "Query timed out, successful queries: ${results.size}")
    } catch (ignored: JobCancellationException) {
        // Ignored
    } catch (ex: Exception) {
        Log.w(Tag, "Unexpected exception", ex)
    }

    return results
}

UPDATE

I didn't have any luck with the previously accepted answer due to concurrent modification exceptions. Below is a slight tweak on the original answer to avoid the concurrent modification exception, however it fails to respect the ticker timeout.

How does one avoid the concurrent modification exception and still respect the ticker timeout?

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = HashSet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticker = ticker(timeoutMs)

    forEach { deferred ->
        deferred.invokeOnCompletion {
            if (!deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed) Value: ${deferred.getCompleted()}")
            } else {
                Log.d(Tag, "(Completed) Exception: $it")
            }
        }
    }

    val processed = HashSet<Deferred<T>>()

    val elapsedTime = measureTimeMillis {
        whileSelect {
            toAwait.minus(processed).forEach { deferred ->
                processed.add(deferred)
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }

            ticker.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}

Deferred instances are created with the following code now:

private fun makeRequest(
        url: String
        , timeoutMs: Int
): Document? = try {
    Jsoup.connect(url).timeout(timeoutMs).get()
} catch (ex: Exception) {
    null
}

private fun createAsyncRequests(
        queryFactories: List<(query: String, pageIndex: Int) -> String>
        , query: String
        , pageIndex: Int
        , timeoutMs: Int
): List<Deferred<QueryResult<TorrentResult>>> = queryFactories.map { queryFactory ->
    async(start = CoroutineStart.LAZY) {
        try {
            val url = queryFactory(query, pageIndex)
            makeRequest(
                    url
                    , timeoutMs
            ).getQueryResult(pageIndex, url)
        } catch (ex: Exception) {
            QueryResult<TorrentResult>(state = QueryResult.State.ERROR)
        }
    }
}

UPDATE

Log below show supplied timoutMs of 2,000ms not being respected as timeout occurs at 11,861ms:

2018-09-13 22:39:00.307 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.315 20475-20807/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.454 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|1/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://indiapirate.com/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.455 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.456 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.470 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|2/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.471 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.472 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.479 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|3/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://superbay.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.480 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|4/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.481 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.482 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.500 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|5/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebays.be/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.501 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|6/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay.nz/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.577 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.578 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|7/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratebay6.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.602 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.603 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|8/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.604 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.605 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|9/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://uktpb.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.621 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.622 20475-20806/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.694 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|10/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxyproxy.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.695 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.712 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|11/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://fastpirate.link/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.713 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|12/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freepirate.eu/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:00.869 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.480 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|13/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://pirate.tel/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.481 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.482 20475-20968/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|14/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratesbay.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.649 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.650 20475-20798/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.780 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|15/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:01.781 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:01.782 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.131 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|16/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.132 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.250 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|17/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.251 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.253 20475-20886/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|18/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://freeproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:03.296 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.297 20475-20802/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.441 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|19/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:03.442 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:03.443 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Starting deferred..
2018-09-13 22:39:04.826 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|20/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepirate.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:04.868 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|21/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxyproxy.fi/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:05.325 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|22/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:05.926 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|23/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.002 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|24/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.117 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|25/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:06.338 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|26/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:06.923 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|27/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebay.red/search/hobbit+1977/0/7
2018-09-13 22:39:07.214 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|28/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateproxy.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:07.625 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|29/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepirateway.click/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.398 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|30/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:08.431 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|31/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.blue/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:08.684 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|32/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://piratepiratepirate.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.033 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|33/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://unblocktpb.org/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.759 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|34/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:09.879 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|35/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbunblock.net/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:09.961 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|36/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpbproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.554 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|37/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:10.715 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|38/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://Piratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:10.855 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|39/50) Value: State: INVALID, Page: 0/0, Item Count: 0, Url: null
2018-09-13 22:39:11.014 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|40/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.fun/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.298 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|41/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.in/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.313 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|42/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://tpb.review/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:11.932 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|43/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://proxybay.life/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.166 20475-20797/com.masterwok.tpbsearchandroid D/DERP: (Completed|44/50) Value: State: SUCCESS, Page: 0/0, Item Count: 14, Url: https://thepiratebayproxy.one/s/?q=hobbit+1977&page=0&orderby=99
2018-09-13 22:39:12.168 20475-20797/com.masterwok.tpbsearchandroid D/DERP: Elapsed time: 11861
2018-09-13 22:39:12.885 20475-20806/com.masterwok.tpbsearchandroid D/DERP: (Completed|45/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@b129740
2018-09-13 22:39:13.437 20475-20807/com.masterwok.tpbsearchandroid D/DERP: (Completed|46/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c801679
2018-09-13 22:39:14.051 20475-20968/com.masterwok.tpbsearchandroid D/DERP: (Completed|47/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@3f89fbe
2018-09-13 22:39:14.154 20475-20798/com.masterwok.tpbsearchandroid D/DERP: (Completed|48/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@c7c181f
2018-09-13 22:39:14.658 20475-20886/com.masterwok.tpbsearchandroid D/DERP: (Completed|49/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@729e76c
2018-09-13 22:39:17.334 20475-20802/com.masterwok.tpbsearchandroid D/DERP: (Completed|50/50) Exception: kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally; job=LazyDeferredCoroutine{Cancelled}@ae2a135

Updated code with logging (I removed the TimeoutException and returned false instead):

suspend fun <T> List<Deferred<T>>.awaitCount(
        count: Int
        , timeoutMs: Long
): List<T> {
    require(count <= size)

    val Tag = "DERP"

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)
    var completedCount = 0

    forEach { deferred ->
        deferred.invokeOnCompletion {
            completedCount++

            if (deferred.isCompletedExceptionally) {
                Log.d(Tag, "(Completed|$completedCount/$size) Exception: $it")
            } else {
                Log.d(Tag, "(Completed|$completedCount/$size) Value: ${deferred.getCompleted()}")
            }
        }
    }

    val elapsedTime = measureTimeMillis {
        whileSelect {
            ticket.onReceive {
                toAwait.forEach { it.cancel() }
                false
            }

            toAwait.forEach { deferred ->
                Log.d(Tag, "Starting deferred..")
                deferred.onAwait {
                    toAwait.remove(deferred)
                    result.add(it)
                    result.size != count
                }
            }
        }
    }

    Log.d(Tag, "Elapsed time: $elapsedTime")

    return result
}
like image 605
masterwok Avatar asked Sep 05 '18 20:09

masterwok


People also ask

How do you wait for coroutine to finish Kotlin?

When we launch a coroutine in Kotlin using launch we are returned the resulting Job . We can wait for the coroutine to finish by calling join() on the Job.

Do coroutines run concurrently?

To cut a long story short, coroutines are like threads executing work concurrently. However, coroutines are not necessarily associated with any particular thread. A coroutine can initiate its execution on one thread, then suspend and continue its execution on a different thread.

Can coroutines in Kotlin be suspended and resumed mid execution?

Coroutines can suspend themselves, and the dispatcher is responsible for resuming them. To specify where the coroutines should run, Kotlin provides three dispatchers that you can use: Dispatchers. Main - Use this dispatcher to run a coroutine on the main Android thread.

Do Kotlin coroutines run in parallel?

Android Online Course for Professionals by MindOrks Similarly, we can do any type of background tasks in parallel using Kotlin Coroutines.


1 Answers

It can be improved by avoiding using additional launched task and root job at all.

kotlinx.coroutines has select clause for such complex operators, which perfectly fits your use-case. Moreover, it's easy to generalize:

suspend fun <T> List<Deferred<T>>.awaitCount(count: Int, timeoutMs: Long): List<T> {
    require(count <= size)

    val toAwait = CopyOnWriteArraySet<Deferred<T>>(this)
    val result = ArrayList<T>()
    val ticket = ticker(timeoutMs)

    whileSelect {
        toAwait.forEach { deferred ->
            deferred.onAwait {
                toAwait.remove(deferred)
                result.add(it)
                result.size != count
            }
        }

        ticket.onReceive {
            val e = TimeoutException()
            toAwait.forEach { it.cancel(e) }
            throw e
        }
    }

    return result
}

Then you can use it in queryAllHosts:

val queries = queryFactories.map { queryHost(...) }
return queries.awaitCount(maxSuccessfulHosts, queryTimeout)

You can adjust awaitCount in the way you want, e.g. specialize it for Retrofit and add isSuccessful check

like image 120
qwwdfsad Avatar answered Oct 09 '22 10:10

qwwdfsad