Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I modify Kotlin Flow distinctUntilChanged to add an expiry time

How can I use distinctUntilChanged() but also add an expiry to it, which means if the same value is in the flow we still collect it because it was longer than expiry milliseconds after the previous duplicated value was emitted.

flow { 
  emit("A")    // printed
  emit("B")    // printed
  emit("A")    // printed
  emit("A")    // NOT printed because duplicate
  delay(5000)
  emit("A")    // printed because 5 seconds elapsed which is more than expiry
}
  .distinctUntilChanged(expiry = 2000)
  .collect {
    println(it)
  }

I would like this to print:

A
B
A
A

Here's the code to test it:

  @Test
  fun `distinctUntilChanged works as expected`(): Unit = runBlocking {
    flow {
      emit("A")    // printed
      emit("B")    // printed
      emit("A")    // printed
      emit("A")    // NOT printed because duplicate
      delay(5000)
      emit("A")    // printed because 5 seconds elapsed which is more than expiry
    }
      .distinctUntilChanged(expiry = 2000)
      .toList().also {
        assertEquals("A", it[0])
        assertEquals("B", it[1])
        assertEquals("A", it[2])
        assertEquals("A", it[3])
      }
  }
like image 324
vovahost Avatar asked Oct 27 '25 05:10

vovahost


2 Answers

I think this will work, but I didn't test very much. I think the logic is self-explanatory. The reason havePreviousValue exists is in case T is nullable and the first emitted value is null.

fun <T> Flow<T>.distinctUntilChanged(expiry: Long) = flow {
    var havePreviousValue = false
    var previousValue: T? = null
    var previousTime: Long = 0
    collect {
        if (!havePreviousValue || previousValue != it || previousTime + expiry < System.currentTimeMillis()) {
            emit(it)
            havePreviousValue = true
            previousValue = it
            previousTime = System.currentTimeMillis()
        }
    }
}
like image 50
Tenfour04 Avatar answered Oct 30 '25 11:10

Tenfour04


Another option:

fun <T> Flow<T>.distinctUntilChanged(expiry: Long) = 
    merge(
        distinctUntilChanged(), 
        channelFlow {
            val shared = buffer(Channel.CONFLATED).produceIn(this)
            debounce(expiry).takeWhile { !shared.isClosedForReceive }.collect { 
                val lastVal = shared.receive()
                shared.receive().let { if(lastVal == it) send(it) }
            }
        }
    )

It basically adds back the non-distinct value to distinctUntilChanged().

like image 34
Ajmal Kunnummal Avatar answered Oct 30 '25 12:10

Ajmal Kunnummal