I'm starting to learn Kotlin Flow and Coroutines but I do not know how to make the code below works. What am I doing wrong?
interface MessagesListener {
fun onNewMessageReceived(message: String)
}
fun messages(): Flow<String> = flow {
val messagesListener = object : MessagesListener {
override fun onNewMessageReceived(message: String) {
// The line below generates the error 'Suspension functions can be called only within coroutine body'
emit(message)
}
}
val messagesPublisher = MessagesPublisher(messagesListener)
messagesPublisher.connect()
}
callbackFlow is a flow builder that lets you convert callback-based APIs into flows. As an example, the Firebase Firestore Android APIs use callbacks. Note: Starting in version 24.3.
StateFlow and LiveData have similarities. Both are observable data holder classes, and both follow a similar pattern when used in your app architecture. The StateFlow and LiveData do behave differently: StateFlow requires an initial state to be passed into the constructor, while LiveData does not.
We have to use channelFlows when we need concurrent flow emissions.
SuspendCancellableCoroutine returns a CancellableContinuation for us to use resume, resumeWithException and throws CancellationException if the continuation is cancelled. (There is another similar function called suspendCoroutine . The difference between them is that suspendCoroutine cannot be cancelled by Job.
I believe you should be able to use callbackFlow
....something like:
fun messages(): Flow<String> = callbackFlow {
val messagesListener = object : MessagesListener {
override fun onNewMessageReceived(message: String) {
trySend(message)
}
}
val messagesPublisher = MessagesPublisher(messagesListener)
messagesPublisher.connect()
}
What you are trying to achieve is not possible because emit
is a suspend
function.
However, you can use callbackFlow which is designed for such cases to convert Listeners/Callbacks into Coroutine's Flows.
fun messages() = callbackFlow<String> {
val messagesListener = object : MessagesListener {
override fun onNewMessageReceived(message: String) {
offer(message)
}
}
val messagesPublisher = MessagesPublisher(messagesListener)
messagesPublisher.connect()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With