Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Flow: receiving values until emitting timeout

I want to collect specific amount of values from Flow until value emitting timeout happened. Unfortunately, there are no such operators, so I've tried to implement my own using debounce operator.

The first problem is that producer is too fast and some packages are skipped and not collected at all (they are in onEach of original packages flow, but not in onEach of second flow of merge in withNullOnTimeout).

The second problem - after taking last value according to amount argument orginal flow is closed, but timeout flow still alive and finally produce timeout after last value.

How can I solve this two problems?

My implementations:


suspend fun receive(packages: Flow<ByteArray>, amount: Int): ByteArray {
        val buffer = ByteArrayOutputStream(blockSize.toInt())
        packages
            .take(10)
            .takeUntilTimeout(100) // <-- custom timeout operator
            .collect { pck ->
                buffer.write(pck.data)
            }
    return buffer.toByteArray()
}

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return withNullOnTimeout(durationMillis)
        .takeWhile { it != null }
        .mapNotNull { it }
}

fun <T> Flow<T>.withNullOnTimeout(durationMillis: Long): Flow<T?> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return merge(
        this,
        map<T, T?> { null }
            .onStart { emit(null) }
            .debounce(durationMillis)
    )
}
like image 410
Alektas Avatar asked Oct 20 '25 13:10

Alektas


2 Answers

How about:

fun <T> Flow<T>.takeUntilTimeout(timeoutMillis: Long) = channelFlow {
    val collector = launch {
        collect {
            send(it)
        }
        close()
    }
    delay(timeoutMillis)
    collector.cancel()
    close()
}

Using a channelFlow allows you to spawn a second coroutine so you can count the time independently, and quite simply.

like image 107
Joffrey Avatar answered Oct 23 '25 06:10

Joffrey


This was what initially seemed obvious to me, but as Joffrey points out in the comments, it can cause an unnecessary delay before collection terminates. I'll leave this as an example of a suboptimal, oversimplified solution.

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> = flow {
    val endTime = System.currentTimeMillis() + durationMillis
    takeWhile { System.currentTimeMillis() >= endTime }
        .collect { emit(it) }
}

Here's an alternate idea I didn't test.

@Suppress("UNCHECKED_CAST")
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    val signal = Any()
    return merge(flow { delay(durationMillis); emit(signal) })
        .takeWhile { it !== signal } as Flow<T>
}
like image 24
Tenfour04 Avatar answered Oct 23 '25 06:10

Tenfour04



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!