I used a PublishSubject and I was sending messages to it and also I was listening for results. It worked flawlessly, but now I'm not sure how to do the same thing with Kotlin's coroutines (flows or channels).
private val subject = PublishProcessor.create<Boolean>>()
...
fun someMethod(b: Boolean) {
subject.onNext(b)
}
fun observe() {
subject.debounce(500, TimeUnit.MILLISECONDS)
.subscribe { /* value received */ }
}
Since I need the debounce operator I really wanted to do the same thing with flows so I created a channel and then I tried to create a flow from that channel and listen to changes, but I'm not getting any results.
private val channel = Channel<Boolean>()
...
fun someMethod(b: Boolean) {
channel.send(b)
}
fun observe() {
flow {
channel.consumeEach { value ->
emit(value)
}
}.debounce(500, TimeUnit.MILLISECONDS)
.onEach {
// value received
}
}
What is wrong?
In coroutines, a flow is a type that can emit multiple values sequentially, as opposed to suspend functions that return only a single value. For example, you can use a flow to receive live updates from a database. Flows are built on top of coroutines and can provide multiple values.
Coroutines can be executed concurrently using a multi-threaded dispatcher like the Dispatchers.
We can for example schedule coroutines on a Java Executor or on Android main looper. However, we can't schedule coroutines on just any thread, it has to cooperate.
Flow is a cold asynchronous stream, just like an Observable.
All transformations on the flow, such as
mapandfilterdo not trigger flow collection or execution, only terminal operators (e.g.single) do trigger it.
The onEach method is just a transformation. Therefore you should replace it with the terminal flow operator collect. Also you could use a BroadcastChannel to have cleaner code:
private val channel = BroadcastChannel<Boolean>(1) suspend fun someMethod(b: Boolean) { channel.send(b) } suspend fun observe() { channel .asFlow() .debounce(500) .collect { // value received } } Update: At the time the question was asked there was an overload of debounce with two parameters (like in the question). There is not anymore. But now there is one which takes one argument in milliseconds (Long).
It should be SharedFlow/MutableSharedFlow for PublishProcessor/PublishRelay
private val _myFlow = MutableSharedFlow<Boolean>( replay = 0, extraBufferCapacity = 1, // you can increase BufferOverflow.DROP_OLDEST ) val myFlow = _myFlow.asSharedFlow() // ... fun someMethod(b: Boolean) { _myFlow.tryEmit(b) } fun observe() { myFlow.debounce(500) .onEach { } // flowOn(), catch{} .launchIn(coroutineScope) } And StateFlow/MutableStateFlow for BehaviorProcessor/BehaviorRelay.
private val _myFlow = MutableStateFlow<Boolean>(false) val myFlow = _myFlow.asStateFlow() // ... fun someMethod(b: Boolean) { _myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b) } fun observe() { myFlow.debounce(500) .onEach { } // flowOn(), catch{} .launchIn(coroutineScope) } StateFlow must have initial value, if you don't want that, this is workaround:
private val _myFlow = MutableStateFlow<Boolean?>(null) val myFlow = _myFlow.asStateFlow() .filterNotNull() MutableStateFlow uses .equals comparison when setting new value, so it does not emit same value again and again (versus distinctUntilChanged which uses referential comparison).
So MutableStateFlow ≈ BehaviorProcessor.distinctUntilChanged(). If you want exact BehaviorProcessor behavior then you can use this:
private val _myFlow = MutableSharedFlow<Boolean>( replay = 1, extraBufferCapacity = 0, BufferOverflow.DROP_OLDEST )
ArrayBroadcastChannel in Kotlin coroutines is the one most similar to PublishSubject.
Unlike PublishSubject, backpressure is inbuilt into the coroutine channels, and that is where the buffer capacity comes in. This number really depends on which use case the channel is being used for. For most of the normal use cases, I just go with 10, which should be more than enough. If you push events faster to this channel than receivers consuming it, you can give more capacity.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With