Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to emit data from an asycnhronous callback using Kotlin Flow?

I'm starting to learn Kotlin Flow and Coroutines but I do not know how to make the code below works. What am I doing wrong?

interface MessagesListener {
    fun onNewMessageReceived(message: String)
}

fun messages(): Flow<String> = flow {

    val messagesListener = object : MessagesListener {
        override fun onNewMessageReceived(message: String) {

            // The line below generates the error 'Suspension functions can be called only within coroutine body'

            emit(message)
        }
    }

    val messagesPublisher = MessagesPublisher(messagesListener)
    messagesPublisher.connect()
}
like image 724
Androider2 Avatar asked Oct 31 '20 16:10

Androider2


People also ask

What is callback flow in Kotlin?

callbackFlow is a flow builder that lets you convert callback-based APIs into flows. As an example, the Firebase Firestore Android APIs use callbacks. Note: Starting in version 24.3.

Why use flow instead of LiveData?

StateFlow and LiveData have similarities. Both are observable data holder classes, and both follow a similar pattern when used in your app architecture. The StateFlow and LiveData do behave differently: StateFlow requires an initial state to be passed into the constructor, while LiveData does not.

When should I use Channelflow?

We have to use channelFlows when we need concurrent flow emissions.

How do you use SuspendCancellableCoroutine?

SuspendCancellableCoroutine returns a CancellableContinuation for us to use resume, resumeWithException and throws CancellationException if the continuation is cancelled. (There is another similar function called suspendCoroutine . The difference between them is that suspendCoroutine cannot be cancelled by Job.


2 Answers

I believe you should be able to use callbackFlow ....something like:

fun messages(): Flow<String> = callbackFlow {

    val messagesListener = object : MessagesListener {
        override fun onNewMessageReceived(message: String) {
            trySend(message)
        }
    }

    val messagesPublisher = MessagesPublisher(messagesListener)
    messagesPublisher.connect()
}
like image 96
John O'Reilly Avatar answered Oct 21 '22 23:10

John O'Reilly


What you are trying to achieve is not possible because emit is a suspend function.

However, you can use callbackFlow which is designed for such cases to convert Listeners/Callbacks into Coroutine's Flows.

fun messages() = callbackFlow<String> {
    val messagesListener = object : MessagesListener {
        override fun onNewMessageReceived(message: String) {
            offer(message)
        }
    }
    val messagesPublisher = MessagesPublisher(messagesListener)
    messagesPublisher.connect()
}
like image 44
Giorgos Neokleous Avatar answered Oct 22 '22 00:10

Giorgos Neokleous