Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a polling mechanism with kotlin coroutines?

I am trying to create a polling mechanism with kotlin coroutines using sharedFlow and want to stop when there are no subscribers and active when there is at least one subscriber. My question is, is sharedFlow the right choice in this scenario or should I use channel. I tried using channelFlow but I am unaware how to close the channel (not cancel the job) outside the block body. Can someone help? Here's the snippet.

 fun poll(id: String) = channelFlow {
            while (!isClosedForSend) {
                try {
                    send(repository.getDetails(id))
                    delay(MIN_REFRESH_TIME_MS)
                } catch (throwable: Throwable) {
                    Timber.e("error -> ${throwable.message}")
                }
                invokeOnClose { Timber.e("channel flow closed.") }
        }
    } 
like image 344
Omkar Amberkar Avatar asked Sep 02 '25 16:09

Omkar Amberkar


1 Answers

You can use SharedFlow which emits values in a broadcast fashion (won't emit new value until the previous one is consumed by all the collectors).

val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()

scope.launch {
    val producer = launch() {
            sharedFlow.emit(...)
    }

    sharedFlow.subscriptionCount
              .map {count -> count > 0}
              .distinctUntilChanged()
              .collect { isActive -> if (isActive) stopProducing() else startProducing()
}

fun CoroutineScope.startProducing() {
    producer = launch() {
        sharedFlow.emit(...)
    }
        
}

fun stopProducing() {
    producer.cancel()
}
like image 98
J.Grbo Avatar answered Sep 04 '25 09:09

J.Grbo