Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to asynchronously map over sequence

I want to iterate over a sequence of objects and return the first non-null of an async call.

The point is to perform some kind of async operation that might fail, and I have a series of fallbacks that I want to try in order, one after the other (i.e. lazily / not in parallel).

I've tried to do something similar to what I'd do if it were a sync call:

// ccs: List<CurrencyConverter>
override suspend fun getExchangeRateAsync(from: String, to: String) =
  ccs.asSequence()
    .map { it.getExchangeRateAsync(from, to) }
    .firstOrNull { it != null }
    ?: throw CurrencyConverterException()

IntelliJ complains:

Suspension functions can only be called within coroutine body

Edit: To clarify, this works as expected if mapping on a List, but I want to see how I'd do this on a sequence.

So I guess this is because the map lambda isn't suspended? But I'm not sure how to actually do that. I tried a bunch of different ways but none seemed to work. I couldn't find any examples.

If I re-write this in a more procedural style using a for loop with an async block, I can get it working:

override suspend fun getExchangeRateAsync(from: String, to: String) {
    for (cc in ccs) {
        var res: BigDecimal? = async {
            cc.getExchangeRateAsync(from, to)
        }.await()

        if (res != null) {
            return res
        }
    }

    throw CurrencyConverterException()
}
like image 661
chroder Avatar asked Aug 22 '18 14:08

chroder


People also ask

How do you async a map?

The map function An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array.

Can I use map with async?

If you use the async await function and console out the output, then you will find the arrays of promises that are still needed to be resolved. The map doesn't resolve the promises on its own but left the stuff for the developer to resolve. So, that means you can't use async-await in the map.

What is the general pattern of an async map in JavaScript?

This makes the general pattern of an async map to be Promise.all (arr.map (async (...) => ...)). An async implementation doing the same as the sync one: The above implementation runs the iteratee function in parallel for each element of the array. This is usually fine, but in some cases, it might consume too much resources.

How do I make map and foreach work with async functions?

By using Promise.all you can make map and forEach work with async functions (i.e. Promises). To make filter, some and every work you can first use an async map (that in turn uses Promise.all) and then go through the true/false values and synchronously do the filtering/evaluation.

Is it bad to have Multiple async maps?

This is usually fine, but in some cases, it might consume too much resources. This can happen when the async function hits an API or consumes too much RAM that it’s not feasible to run too many at once. While an async map is easy to write, adding concurrency controls is more involved.

How do you implement async before a function?

An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array. Fortunately, the Promise.all built-in call is exactly what we need for step 2.


3 Answers

You are getting an error, because Sequence is lazy by default and it's map isn't an inline function, so it's scope isn't defined

You can avoid using Sequence by creating a list of lazy coroutines

// ccs: List<CurrencyConverter>
suspend fun getExchangeRateAsync(from: String, to: String) =
    ccs
    .map { async(start = CoroutineStart.LAZY) { it.getExchangeRateAsync(from, to) } }
    .firstOrNull { it.await() != null }
    ?.getCompleted() ?: throw Exception()

This doesn't give any errors and seems to be working. But I'm not sure it's an idiomatic way

like image 154
Dima Rostopira Avatar answered Oct 27 '22 07:10

Dima Rostopira


I would suggest replacing Sequence with Flow. Flow api and behavior is pretty much same as for Sequence, but with suspending options.

https://kotlinlang.org/docs/reference/coroutines/flow.html

Code:

override suspend fun getExchangeRateAsync(from: String, to: String) =
    ccs.asFlow()
    .map { it.getExchangeRateAsync(from, to) }
    .firstOrNull { it != null }
    ?: throw CurrencyConverterException()
like image 40
jlinhart Avatar answered Oct 27 '22 07:10

jlinhart


FWIW, I found the suggestion in How to asynchronously map over sequence to be very intuitive. The code at https://github.com/Kotlin/kotlin-coroutines-examples/blob/master/examples/suspendingSequence/suspendingSequence.kt defines SuspendingIterator which allows next() to suspend, then builds SuspendingSequence on top of it. Unfortunately, you need to duplicate extension functions like flatMap(), filter(), etc. since SuspendingSequence can't be related to Sequence, but I did this and am much happier with the result than using a Channel.

like image 26
jrhy Avatar answered Oct 27 '22 07:10

jrhy