I have an UNLIMITED size buffered channel where senders are much faster than receivers. I would like to update the buffer by removing old data and replacing it with newer one (if the receiver does not yet consume it)
Here is my code
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
data class Item(val id: Int, val value: Int)
val testData = listOf(
Item(1, 10),
Item(2, 24),
Item(3, 12),
Item(1, 17), // This one should replace the Item(1, 10) if it's not yet consumed
Item(4, 16),
Item(2, 32), // This one should replace the Item(2, 24) if it's not yet consumed
)
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Item>(Channel.UNLIMITED)
launch {
for(item in testData) {
delay(50)
println("Producing item $item")
channel.send(item)
}
}
// As you can see the sender already sent all the testData and they are waiting in the buffer to be consumed by the receiver.
// I would like to do some checks whenever new item is added to the buffer
// if(itemInBuffer.id == newItem.id && itemInBuffer.value < newItem.value) then replace it with newItem
launch {
for (item in channel) {
delay(5000)
println(item.toString())
}
}
}
Is there any kotlin built function which takes some custom condition and removes items from the buffer? I saw there is a function called distinctUntilChangedBy in flow which removes the duplicate data based on the custom key selector. Is there anything similar available for Channel or Is it possible to achieve it with ChannelFlow (Note: in my real code events are comes from some network calls so I'm not sure channelFlow could be suitable there)
This isn't as simple as it sounds. We can't access the channel queue to modify its contents and moreover, even if we could, it wouldn't be easy to find an item with the same id. We would have to iterate over the whole queue. distinctUntilChangedBy() is a much different case, because it only compares the last item - it doesn't look through the whole queue.
I think our best bet is to not use queues provided by channels, but instead store data by ourselves in a map and only provide send and receive functionality for it. I implemented this as a flow-like operator and also I made it generic, so it could be used in other similar cases:
context(CoroutineScope)
fun <T : Any, K> ReceiveChannel<T>.groupingReduce(keySelector: (T) -> K, reduce: (T, T) -> T): ReceiveChannel<T> = produce {
val items = mutableMapOf<K, T>()
while (!isClosedForReceive) {
select<Unit> {
if (items.isNotEmpty()) {
val (key, item) = items.entries.first()
onSend(item) {
items.remove(key)
}
}
onReceiveCatching { result ->
val item = result.getOrElse { return@onReceiveCatching }
items.merge(keySelector(item), item, reduce)
}
}
}
items.values.forEach { send(it) }
}
It keeps the data in a map, it tries to send and receive at the same time, whatever finishes the first. If received an item and the key is already in a map, it allows to merge both values in a way provided by the caller. It sends items in the order they appeared the first time in the source channel, so new value for the same key doesn't push back this item to the last position in the queue.
This is how we can use it with the example provided by you. I modified it a little as your version is confusing to me. It consumes (1, 10) before producing (1, 17), so actually the example is incorrect. Also, producer and consumer don't run at the same time, so launching them concurrently and adding delays doesn't change too much:
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Item>(Channel.UNLIMITED)
val channel2 = channel.groupingReduce(
keySelector = { it.id },
reduce = { it1, it2 -> if (it1.value > it2.value) it1 else it2 }
)
for(item in testData) {
println("Producing item $item")
channel.send(item)
}
channel.close()
// Needed because while using `UNLIMITED` sending is almost immediate,
// so it actually starts consuming at the same time it is producing.
delay(100)
for (item in channel2) {
println(item.toString())
}
}
I created another example where producer and consumer actually run concurrently. Items are produced every 100ms and are consumed every 200ms with initial delay of 50ms.
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Item>(Channel.UNLIMITED)
val channel2 = channel.groupingReduce(
keySelector = { it.id },
reduce = { it1, it2 -> if (it1.value > it2.value) it1 else it2 }
)
launch {
delay(50)
for (item in channel2) {
println(item.toString())
delay(200)
}
}
launch {
listOf(
Item(1, 10),
// consume: 1, 10
Item(2, 20),
Item(1, 30),
// consume: 2, 20
Item(3, 40),
Item(1, 50),
// consume: 1, 50
Item(4, 60),
Item(1, 70),
// consume: 3, 40
Item(5, 80),
// consume: 4, 60
// consume: 1, 70
// consume: 5, 80
).forEach {
channel.send(it)
delay(100)
}
channel.close()
}
}
Maybe there is a better way to solve this. Also, to be honest, I'm not 100% sure this code is correct. Maybe I missed some corner case around channel closing, cancellations or failures. Additionally, I'm not sure if select { onSend() } guarantees that if the code block has not been executed, then the item has not been sent. If we cancel send(), we don't have a guarantee the item has not been sent. It may be the same in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With