I want to iterate over a sequence of objects and return the first non-null of an async call.
The point is to perform some kind of async operation that might fail, and I have a series of fallbacks that I want to try in order, one after the other (i.e. lazily / not in parallel).
I've tried to do something similar to what I'd do if it were a sync call:
// ccs: List<CurrencyConverter>
override suspend fun getExchangeRateAsync(from: String, to: String) =
ccs.asSequence()
.map { it.getExchangeRateAsync(from, to) }
.firstOrNull { it != null }
?: throw CurrencyConverterException()
IntelliJ complains:
Suspension functions can only be called within coroutine body
Edit: To clarify, this works as expected if mapping on a List, but I want to see how I'd do this on a sequence.
So I guess this is because the map lambda isn't suspended? But I'm not sure how to actually do that. I tried a bunch of different ways but none seemed to work. I couldn't find any examples.
If I re-write this in a more procedural style using a for
loop with an async block, I can get it working:
override suspend fun getExchangeRateAsync(from: String, to: String) {
for (cc in ccs) {
var res: BigDecimal? = async {
cc.getExchangeRateAsync(from, to)
}.await()
if (res != null) {
return res
}
}
throw CurrencyConverterException()
}
The map function An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array.
If you use the async await function and console out the output, then you will find the arrays of promises that are still needed to be resolved. The map doesn't resolve the promises on its own but left the stuff for the developer to resolve. So, that means you can't use async-await in the map.
This makes the general pattern of an async map to be Promise.all (arr.map (async (...) => ...)). An async implementation doing the same as the sync one: The above implementation runs the iteratee function in parallel for each element of the array. This is usually fine, but in some cases, it might consume too much resources.
By using Promise.all you can make map and forEach work with async functions (i.e. Promises). To make filter, some and every work you can first use an async map (that in turn uses Promise.all) and then go through the true/false values and synchronously do the filtering/evaluation.
This is usually fine, but in some cases, it might consume too much resources. This can happen when the async function hits an API or consumes too much RAM that it’s not feasible to run too many at once. While an async map is easy to write, adding concurrency controls is more involved.
An async version needs to do two things. First, it needs to map every item to a Promise with the new value, which is what adding async before the function does. And second, it needs to wait for all the Promises then collect the results in an Array. Fortunately, the Promise.all built-in call is exactly what we need for step 2.
You are getting an error, because Sequence
is lazy by default and it's map
isn't an inline function, so it's scope isn't defined
You can avoid using Sequence
by creating a list of lazy coroutines
// ccs: List<CurrencyConverter>
suspend fun getExchangeRateAsync(from: String, to: String) =
ccs
.map { async(start = CoroutineStart.LAZY) { it.getExchangeRateAsync(from, to) } }
.firstOrNull { it.await() != null }
?.getCompleted() ?: throw Exception()
This doesn't give any errors and seems to be working. But I'm not sure it's an idiomatic way
I would suggest replacing Sequence with Flow. Flow api and behavior is pretty much same as for Sequence, but with suspending options.
https://kotlinlang.org/docs/reference/coroutines/flow.html
Code:
override suspend fun getExchangeRateAsync(from: String, to: String) =
ccs.asFlow()
.map { it.getExchangeRateAsync(from, to) }
.firstOrNull { it != null }
?: throw CurrencyConverterException()
FWIW, I found the suggestion in How to asynchronously map over sequence to be very intuitive. The code at https://github.com/Kotlin/kotlin-coroutines-examples/blob/master/examples/suspendingSequence/suspendingSequence.kt defines SuspendingIterator
which allows next()
to suspend, then builds SuspendingSequence
on top of it. Unfortunately, you need to duplicate extension functions like flatMap()
, filter()
, etc. since SuspendingSequence
can't be related to Sequence
, but I did this and am much happier with the result than using a Channel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With