I have a flow producer that will emit elements with random period, I don't want to handle these elements once it emit, I'd rather gather them into a list, then handle them, and keep this process running.
For example, if I want to have 2s as a period, want I want to get for below flow is
val flow = flow{
for(i in 1..5){
emit(i)
delay(1100)
}
}
//#1 handle [1,2]
//#2 handle [3,4]
//#3 handle [5]
Below is what I tried but failed:
take(2)
, this will make the flow end after I get two elements, and there's no time to controlcollectToList().sample(2s)
, this is my customized function, adding elements into a list. But the issue is I need to clear the list in the collectToList()
after sample()
, otherwise the list will be appended forever, but if I do the clear, this may cause concurrency issues like "add a new element into the list just before I clear the list"Any better ideas?
I think you can solve this by
flow
to 'tick' every 2 seconds,MutableSharedFlow
),flow
'ticks', you can fetch the windowed values, and reset the values.I'm still learning Coroutines, so feedback would be helpful for everyone.
Create a flow
that 'ticks' every 2 seconds
val windowDuration: Duration = 2.seconds
val timerFlow: Flow<Unit> = flow {
while (true) {
println("---------tick---------")
emit(Unit) // the value is unimportant
delay(windowDuration)
yield()
}
}
We'll use this timerFlow
to determine the start and end of each window. When it 'ticks', we'll
fetch the values that we've windowed, and reset the window.
Store the state of the window in a MutableStateFlow
.
val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())
(Using MutableStateFlow
might not be the best solution. I'm just using it as a bucket to store a
shared state. See the docs
for alternatives for sharing mutable state)
Now, for each input value, update the MutableStateFlow
...
// launch a new coroutine, so the inputs are collected in the 'background'
launch {
inputValues
.onEach { value: Int -> state.update { it + value } }
.collect()
}
And now every time timerFlow
ticks, we can fetch the values, and reset the shared state, so the
next window starts from scratch.
val windowedValuesFlow: Flow<List<Int>> =
timerFlow.map { _: Unit ->
// tick!
// emit the values from the previous window, and reset for the next window
state.getAndUpdate {
println("-----reset-window-----")
emptyList()
}
}
windowedValuesFlow
.takeWhile { it.isNotEmpty() }
.onEach { windowedValues: List<Int> ->
println(">>> window: $windowedValues")
}
.collect()
Because timerFlow
is infinite, this example runs indefinitely. You might want to only collect
windows while the windows have values.
windowedValuesFlow
.takeWhile { it.isNotEmpty() } // limit the length of the flow
.onEach { windowedValues: List<Int> ->
println(">>> window: $windowedValues")
}
.collect()
1: emitting
1: delaying 100ms
2: emitting
2: delaying 200ms
3: emitting
3: delaying 300ms
4: emitting
4: delaying 400ms
5: emitting
5: delaying 500ms
6: emitting
6: delaying 600ms
---------tick---------
-----reset-window-----
>>> window: [1, 2, 3, 4, 5, 6]
7: emitting
7: delaying 700ms
8: emitting
8: delaying 800ms
9: emitting
9: delaying 900ms
---------tick---------
-----reset-window-----
>>> window: [7, 8, 9]
10: emitting
10: delaying 1s
---------tick---------
-----reset-window-----
>>> window: [10]
---------tick---------
-----reset-window-----
Process finished with exit code 0
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
fun main() {
val inputValues: Flow<Int> = flow {
for (i in 1..10) {
println("$i: emitting")
emit(i)
val delay = i.seconds / 10
println("$i: delaying $delay")
delay(delay)
}
}
val windowDuration: Duration = 2.seconds
val timerFlow: Flow<Unit> = flow {
while (true) {
delay(windowDuration)
println("---------tick---------")
emit(Unit) // the value is unimportant
yield()
}
}
val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())
runBlocking {
// launch a new coroutine, so the inputs are collected in the 'background'
launch {
inputValues
.onEach { value: Int -> state.update { it + value } }
.collect()
}
val windowedValuesFlow: Flow<List<Int>> =
timerFlow.map { _: Unit ->
// tick!
// emit the values from the previous window, and reset for the next window
state.getAndUpdate {
println("-----reset-window-----")
emptyList()
}
}
windowedValuesFlow
.takeWhile { it.isNotEmpty() } // limit the length of the flow
.onEach { windowedValues: List<Int> ->
println(">>> window: $windowedValues")
}
.collect()
}
}
The short answer is that there is no built-in function for this at the moment AFAIK.
I think what you're looking for is a time-based chunked
operator, which has been discussed quite a bit, but still hasn't made it to the library. Not sure if it's in their plans for the near future.
Maybe you can take a look at the proposed implementation in this merge request and adapt it to your needs (you probably don't need as much flexibility, nor the extra chunking strategies). I'm sorry I'm afk so I can't do the adaptation myself.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With