There is zip function to zip two Flows
. Is there something to zip three (or more) Flows
together?
If not, can you help me to implement extension function for it? Something like:
flow.zip(flow2, flow3) { a, b, c ->
}
The simplest involves merging the elements from two flows into one. No modifications are made, no matter from which flow elements originate. To do this, we use the top-level merge function. It is important to know that when we use merge the elements from one flow do not wait for another flow.
Lightweight: You can run many coroutines on a single thread due to support for suspension, which doesn't block the thread where the coroutine is running. Suspending saves memory over blocking while supporting many concurrent operations. Fewer memory leaks: Use structured concurrency to run operations within a scope.
Kotlin coroutines provide an API that enables you to write asynchronous code. With Kotlin coroutines, you can define a CoroutineScope , which helps you to manage when your coroutines should run. Each asynchronous operation runs within a particular scope.
You can check the zip
operator implementation and try to copy/emulate how it works adapting it to your needs.
Test it and make all the changes you need
fun <T1, T2, T3, R> Flow<T1>.zip(flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R> = channelFlow {
val first: ReceiveChannel<T1> = produce {
[email protected] {
channel.send(it)
}
}
val second: ReceiveChannel<T2> = produce {
flow2.collect {
channel.send(it)
}
}
val third: ReceiveChannel<T3> = produce {
flow3.collect {
channel.send(it)
}
}
(second as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
(third as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!second.isClosedForReceive) second.cancel(MyFlowException())
}
val otherIterator = second.iterator()
val anotherIterator = third.iterator()
try {
first.consumeEach { value ->
if (!otherIterator.hasNext() || !anotherIterator.hasNext()) {
return@consumeEach
}
send(transform(value, otherIterator.next(), anotherIterator.next()))
}
} catch (e: MyFlowException) {
// complete
} finally {
if (!second.isClosedForReceive) second.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
}
class MyFlowException: CancellationException()
Usage:
flow1.zip(flow2, flow3) { a, b, c ->
//Do your work
}...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With