Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging kotlin flows

Given 2 or more flows with the same type, is there an existing Kotlin coroutine function to merge them, like the RX merge operator?

Currently I was considering this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = channelFlow {
    val flowJobs = flows.map { flow ->
        GlobalScope.launch { flow.collect { send(it) } }
    }
    flowJobs.joinAll()
}

but it seems somewhat clumsy.

like image 981
Andy Avatar asked Aug 28 '19 13:08

Andy


3 Answers

This is now (Coroutines Version 1.3.5 at time of writing) part of the Coroutines library.

You use it like this:

val flowA = flow { emit(1) } 
val flowB = flow { emit(2) }

merge(flowA, flowB).collect{ println(it) } // Prints two integers
// or:
listOf(flowA, flowB).merge().collect { println(it) } // Prints two integers

You can read more in the source code

like image 132
Jacques.S Avatar answered Nov 20 '22 13:11

Jacques.S


I'm not too familiar with flows yet, so this might be suboptimal. Anyway, I think you could create a flow of all your input flows, and then use flattenMerge to flatten them into a single flow again. Something like this:

fun <T> merge(vararg flows: Flow<T>): Flow<T> = flowOf(*flows).flattenMerge()

Edit:

The merge-function was added to kotlinx-coroutines in the 1.3.3 release. See here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/merge.html

like image 20
marstran Avatar answered Nov 20 '22 14:11

marstran


It may be late but I believe this may be a viable solution:

fun <T> combineMerge(vararg flows: Flow<T>) = flow {
    coroutineScope {
        flows.forEach {
            launch {
                it.collect {
                    emit(it)
                }
            }
        }
    }
}

fun <T> combineConcat(vararg flows: Flow<T>) = flow {
    flows.forEach {
        it.collect {
            emit(it)
        }
    }
}
like image 1
Lamberto Basti Avatar answered Nov 20 '22 13:11

Lamberto Basti