I'm trying to wrap a callbackFlow within an outer flow - there are items I'd like to emit from the outer flow, but I've got an old callback interface, which I'd like to adapt to Kotlin flow. I've looked at several examples of usage of callbackFlow but I can't figure out how to properly trigger it within another flow.
Here's an example:
class Processor {
fun start(processProgress: ProcessProgressListener) {
processProgress.onFinished() //finishes as soon as it starts!
}
}
interface ProcessProgressListener {
fun onFinished()
}
//main method here:
fun startProcess(processor: Processor): Flow<String> {
val mainFlow = flow {
emit("STARTED")
emit("IN_PROGRESS")
}
return merge(processProgressFlow(processor), mainFlow)
}
fun processProgressFlow(processor: Processor) = callbackFlow {
val listener = object : ProcessProgressListener {
override fun onFinished() {
trySend("FINISHED")
}
}
processor.start(listener)
}
The Processor takes a listener, which is triggered when the process has finished. When that happens, I would like to emit the final item FINISHED.
The way I invoke the whole flow is as follows:
runBlocking {
startProcess(Processor()).collect {
print(it)
}
}
But, I get no output whatsoever. If I don't use the megre and only return the mainFlow, however, I do get the STARTED and IN_PROGRESS items though.
What am I doing wrong?
You forgot to call awaitClose in the end of callbackFlow block:
fun processProgressFlow(processor: Processor) = callbackFlow<String> {
val listener = object : ProcessProgressListener {
override fun onFinished() {
trySend("FINISHED")
channel.close()
}
}
processor.start(listener)
/*
* Suspends until 'channel.close() or cancel()' is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose { /* unregister listener here */ }
}
awaitClose {} should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
According to the callbackFlow docs:
awaitCloseshould be used to keep the flow running, otherwise the channel will be closed immediately when block completes.awaitCloseargument is called either when a flow consumer cancels the flow collection or when a callback-based API invokesSendChannel.closemanually and is typically used to cleanup the resources after the completion, e.g. unregister a callback. UsingawaitCloseis mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed. To avoid such leaks, this method throwsIllegalStateExceptionif block returns, but the channel is not closed yet.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With