Given 2 or more flows with the same type, is there an existing Kotlin coroutine function to merge them, like the RX merge operator?
Currently I was considering this:
fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
val flowJobs = flows.map { flow ->
GlobalScope.launch { flow.collect { send(it) } }
}
flowJobs.joinAll()
}
but it seems somewhat clumsy.
This is now (Coroutines Version 1.3.5 at time of writing) part of the Coroutines library.
You use it like this:
val flowA = flow { emit(1) }
val flowB = flow { emit(2) }
merge(flowA, flowB).collect{ println(it) } // Prints two integers
// or:
listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers
You can read more in the source code
I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use flattenMerge
to flatten them into a single flow again. Something like this:
fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()
Edit:
The merge
-function was added to kotlinx-coroutines in the 1.3.3 release. See here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html
It may be late but I believe this may be a viable solution:
fun <T> combineMerge(vararg flows: Flow<T>) = flow {
coroutineScope {
flows.forEach {
launch {
it.collect {
emit(it)
}
}
}
}
}
fun <T> combineConcat(vararg flows: Flow<T>) = flow {
flows.forEach {
it.collect {
emit(it)
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With