Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Coroutines zip three Flows

There is zip function to zip two Flows. Is there something to zip three (or more) Flows together?

If not, can you help me to implement extension function for it? Something like:

flow.zip(flow2, flow3) { a, b, c -> 

}
like image 897
Francis Avatar asked Mar 30 '20 12:03

Francis


People also ask

How do you combine two flows?

The simplest involves merging the elements from two flows into one. No modifications are made, no matter from which flow elements originate. To do this, we use the top-level merge function. It is important to know that when we use merge the elements from one flow do not wait for another flow.

Why kotlin coroutines are lightweight?

Lightweight: You can run many coroutines on a single thread due to support for suspension, which doesn't block the thread where the coroutine is running. Suspending saves memory over blocking while supporting many concurrent operations. Fewer memory leaks: Use structured concurrency to run operations within a scope.

What is CoroutineScope in Kotlin?

Kotlin coroutines provide an API that enables you to write asynchronous code. With Kotlin coroutines, you can define a CoroutineScope , which helps you to manage when your coroutines should run. Each asynchronous operation runs within a particular scope.


1 Answers

You can check the zip operator implementation and try to copy/emulate how it works adapting it to your needs.

Test it and make all the changes you need

fun <T1, T2, T3, R> Flow<T1>.zip(flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R> = channelFlow {

    val first: ReceiveChannel<T1> = produce {
        [email protected] {
            channel.send(it)
        }
    }

    val second: ReceiveChannel<T2> = produce {
        flow2.collect {
            channel.send(it)
        }
    }

    val third: ReceiveChannel<T3> = produce {
        flow3.collect {
            channel.send(it)
        }
    }

    (second as SendChannel<*>).invokeOnClose {
        if (!first.isClosedForReceive) first.cancel(MyFlowException())
        if (!third.isClosedForReceive) third.cancel(MyFlowException())
    }

    (third as SendChannel<*>).invokeOnClose {
        if (!first.isClosedForReceive) first.cancel(MyFlowException())
        if (!second.isClosedForReceive) second.cancel(MyFlowException())
    }

    val otherIterator = second.iterator()
    val anotherIterator = third.iterator()

    try {
        first.consumeEach { value ->
            if (!otherIterator.hasNext() || !anotherIterator.hasNext()) {
                return@consumeEach
            }
            send(transform(value, otherIterator.next(), anotherIterator.next()))
        }
    } catch (e: MyFlowException) {
        // complete
    } finally {
        if (!second.isClosedForReceive) second.cancel(MyFlowException())
        if (!third.isClosedForReceive) third.cancel(MyFlowException())
    }
}

class MyFlowException: CancellationException()

Usage:

flow1.zip(flow2, flow3) { a, b, c ->
    //Do your work
}...
like image 199
Glenn Sandoval Avatar answered Oct 04 '22 16:10

Glenn Sandoval