Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flow wait some time, then gather all emitted elements into a list, and keep this process running

I have a flow producer that will emit elements with random period, I don't want to handle these elements once it emit, I'd rather gather them into a list, then handle them, and keep this process running.

For example, if I want to have 2s as a period, want I want to get for below flow is

val flow = flow{
  for(i in 1..5){
    emit(i)
    delay(1100)
  }
}
//#1 handle [1,2]
//#2 handle [3,4]
//#3 handle [5]

Below is what I tried but failed:

  • take(2), this will make the flow end after I get two elements, and there's no time to control
  • collectToList().sample(2s), this is my customized function, adding elements into a list. But the issue is I need to clear the list in the collectToList() after sample(), otherwise the list will be appended forever, but if I do the clear, this may cause concurrency issues like "add a new element into the list just before I clear the list"

Any better ideas?

like image 782
Toffee Lu Avatar asked Sep 17 '25 00:09

Toffee Lu


2 Answers

I think you can solve this by

  1. using a flow to 'tick' every 2 seconds,
  2. storing input values in a shared mutable state (MutableSharedFlow),
  3. then, every time the flow 'ticks', you can fetch the windowed values, and reset the values.

I'm still learning Coroutines, so feedback would be helpful for everyone.

Timer flow

Create a flow that 'ticks' every 2 seconds

val windowDuration: Duration = 2.seconds

val timerFlow: Flow<Unit> = flow {
  while (true) {
    println("---------tick---------")
    emit(Unit) // the value is unimportant
    delay(windowDuration)
    yield()
  }
}

We'll use this timerFlow to determine the start and end of each window. When it 'ticks', we'll fetch the values that we've windowed, and reset the window.

Shared window state

Store the state of the window in a MutableStateFlow.

val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())

(Using MutableStateFlow might not be the best solution. I'm just using it as a bucket to store a shared state. See the docs for alternatives for sharing mutable state)

Windowing the input values

Now, for each input value, update the MutableStateFlow...

// launch a new coroutine, so the inputs are collected in the 'background'
launch {
  inputValues
    .onEach { value: Int -> state.update { it + value } }
    .collect()
}

Fetch and reset the window

And now every time timerFlow ticks, we can fetch the values, and reset the shared state, so the next window starts from scratch.

val windowedValuesFlow: Flow<List<Int>> =
  timerFlow.map { _: Unit ->
    // tick!
    // emit the values from the previous window, and reset for the next window
    state.getAndUpdate {
      println("-----reset-window-----")
      emptyList()
    }
  }

windowedValuesFlow
  .takeWhile { it.isNotEmpty() }
  .onEach { windowedValues: List<Int> ->
    println(">>> window: $windowedValues")
  }
  .collect()

Prevent infinite running

Because timerFlow is infinite, this example runs indefinitely. You might want to only collect windows while the windows have values.

windowedValuesFlow
  .takeWhile { it.isNotEmpty() } // limit the length of the flow
  .onEach { windowedValues: List<Int> ->
    println(">>> window: $windowedValues")
  }
  .collect()

Example output

1: emitting
1: delaying 100ms
2: emitting
2: delaying 200ms
3: emitting
3: delaying 300ms
4: emitting
4: delaying 400ms
5: emitting
5: delaying 500ms
6: emitting
6: delaying 600ms
---------tick---------
-----reset-window-----
>>> window: [1, 2, 3, 4, 5, 6]
7: emitting
7: delaying 700ms
8: emitting
8: delaying 800ms
9: emitting
9: delaying 900ms
---------tick---------
-----reset-window-----
>>> window: [7, 8, 9]
10: emitting
10: delaying 1s
---------tick---------
-----reset-window-----
>>> window: [10]
---------tick---------
-----reset-window-----

Process finished with exit code 0

Full example

  • Kotlin 1.6.10
  • Kotlinx Coroutines 1.6.0
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield


fun main() {

  val inputValues: Flow<Int> = flow {
    for (i in 1..10) {
      println("$i: emitting")
      emit(i)
      val delay = i.seconds / 10
      println("$i: delaying $delay")
      delay(delay)
    }
  }

  val windowDuration: Duration = 2.seconds

  val timerFlow: Flow<Unit> = flow {
    while (true) {
      delay(windowDuration)
      println("---------tick---------")
      emit(Unit) // the value is unimportant
      yield()
    }
  }

  val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())

  runBlocking {

    // launch a new coroutine, so the inputs are collected in the 'background'
    launch {
      inputValues
        .onEach { value: Int -> state.update { it + value } }
        .collect()
    }

    val windowedValuesFlow: Flow<List<Int>> =
      timerFlow.map { _: Unit ->
        // tick!
        // emit the values from the previous window, and reset for the next window
        state.getAndUpdate {
          println("-----reset-window-----")
          emptyList()
        }
      }
    
    windowedValuesFlow
      .takeWhile { it.isNotEmpty() } // limit the length of the flow
      .onEach { windowedValues: List<Int> ->
        println(">>> window: $windowedValues")
      }
      .collect()

  }
}
like image 146
aSemy Avatar answered Sep 18 '25 15:09

aSemy


The short answer is that there is no built-in function for this at the moment AFAIK.

I think what you're looking for is a time-based chunked operator, which has been discussed quite a bit, but still hasn't made it to the library. Not sure if it's in their plans for the near future.

Maybe you can take a look at the proposed implementation in this merge request and adapt it to your needs (you probably don't need as much flexibility, nor the extra chunking strategies). I'm sorry I'm afk so I can't do the adaptation myself.

like image 28
Joffrey Avatar answered Sep 18 '25 14:09

Joffrey