I have a List<Flow<T>>
, and would like to generate a Flow<List<T>>
. This is almost what combine
does - except that combine waits for each and every Flow
to emit an initial value, which is not what I want. Take this code for example:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
With combine
(and hence as-is), this is the output:
[a2, b1, c]
[a2, b2, c]
Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
By forcing all the flows to emit a first, irrelevant value, the combine
transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?
In Kotlin Flow documentation, it share the 3 types of buffers, i.e. Buffer, Conflate, and CollectLatest. They allow the Kotlin Flow to emit elements before the previous one is completely processed, enabling parallelism. The three buffers operation behave slightly differently.
What is Kotlin Flow? A flow is an asynchronous version of the sequence whose values are emitted lazily. Flow produces each value on-demand whenever it is required. Flow can emit multiple values sequentially opposite to the suspend function that returns only a single value.
Collecting from a flow To get all the values in the stream as they're emitted, use collect . You can learn more about terminal operators in the official flow documentation. As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value.
How about this:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
It solves a few problems:
[]
is not in the resulting FlowSo, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
Output:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Try it out here!
Edit: Updated answer to handle Flows which emit null values too.
* The used low-level array is thread-safe. It's as if you are dealing with single variables.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With