Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine multiple Kotlin flows in a list without waiting for a first value

I have a List<Flow<T>>, and would like to generate a Flow<List<T>>. This is almost what combine does - except that combine waits for each and every Flow to emit an initial value, which is not what I want. Take this code for example:

val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}

With combine (and hence as-is), this is the output:

[a2, b1, c]
[a2, b2, c]

Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:

[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:

val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}

By forcing all the flows to emit a first, irrelevant value, the combine transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:

sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}

Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?

like image 357
Marc Plano-Lesay Avatar asked Apr 13 '20 09:04

Marc Plano-Lesay


People also ask

What is buffer in kotlin flow?

In Kotlin Flow documentation, it share the 3 types of buffers, i.e. Buffer, Conflate, and CollectLatest. They allow the Kotlin Flow to emit elements before the previous one is completely processed, enabling parallelism. The three buffers operation behave slightly differently.

Is kotlin flow asynchronous?

What is Kotlin Flow? A flow is an asynchronous version of the sequence whose values are emitted lazily. Flow produces each value on-demand whenever it is required. Flow can emit multiple values sequentially opposite to the suspend function that returns only a single value.

How do you collect kotlin flow?

Collecting from a flow To get all the values in the stream as they're emitted, use collect . You can learn more about terminal operators in the official flow documentation. As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value.


1 Answers

How about this:

inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
    val array= Array(flows.size) {
        false to (null as T?) // first element stands for "present"
    }

    flows.forEachIndexed { index, flow ->
        launch {
            flow.collect { emittedElement ->
                array[index] = true to emittedElement
                send(array.filter { it.first }.map { it.second })
            }
        }
    }
}

It solves a few problems:

  • no need to introduce a new type
  • [] is not in the resulting Flow
  • abstracts away null-handling (or however it is solved) from the call-site, the resulting Flow deals with it itself

So, you won't notice any implementation specific workarounds, because you don't have to deal with it during collection:

runBlocking {
    instantCombine(a, b, c).collect {
        println(it)
    }
}

Output:

[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]

Try it out here!

Edit: Updated answer to handle Flows which emit null values too.


* The used low-level array is thread-safe. It's as if you are dealing with single variables.

like image 193
Willi Mentzel Avatar answered Oct 24 '22 21:10

Willi Mentzel