Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to Execute code after Kotlin Flow collect

I'm trying to execute some code after calling collect on a Flow<MyClass>. I'm still kind of new to using Flows so I don't understand why the code after the function doesn't get called.

How I use the Flow:

incidentListener = FirebaseUtils.databaseReference
                      .child(AppConstants.FIREBASE_PATH_AS)
                      .child(id)
                      .listen<MyClass>() //This returns a Flow<MyClass?>?

How I consume the Flow:

private suspend fun myFun() {
   viewmodel.getListener()?.collect { myClass->
       //do something here
   }
   withContext(Dispatchers.Main) { updateUI() } //the code never reaches this part
}

How myFun() is called:

CoroutineScope(Dispatchers.IO).launch {
   myFun()
}

As far as what I've tried to make it work I've tried closing the coroutine context and it didn't work. I'm assuming Flows work differently than regular coroutines.

Update:

I'm listening through Firebase using this block of code. I don't know if it'll help but maybe the way I implemented it is causing the issue?

inline fun <reified T> Query.listen(): Flow<T?>? =
callbackFlow {
    val valueListener = object : ValueEventListener {
        override fun onCancelled(databaseError: DatabaseError) {
            close()
        }

        override fun onDataChange(dataSnapshot: DataSnapshot) {
            try {
                val value = dataSnapshot.getValue(T::class.java)
                offer(value)
            } catch (exp: Exception) {
                if (!isClosedForSend) offer(null)
            }
        }
    }
    addValueEventListener(valueListener)
    awaitClose { removeEventListener(valueListener) }
}
like image 222
kobowo Avatar asked Mar 19 '20 17:03

kobowo


People also ask

Is kotlin flow collect blocking?

Flow is an idiomatic way in kotlin to publish sequence of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further.

How do you collect kotlin flow?

Collecting from a flow To get all the values in the stream as they're emitted, use collect . You can learn more about terminal operators in the official flow documentation. As collect is a suspend function, it needs to be executed within a coroutine. It takes a lambda as a parameter that is called on every new value.

What is run blocking Kotlin?

runBlocking is almost never a tool you use in production. It undoes the asynchronous, non-blocking nature of coroutines. You can use it if you happen to already have some coroutine-based code that you want to use in a context where coroutines provide no value: in blocking calls.

Are kotlin flows hot or cold?

Hot observables send or emit data even if no one is watching them and cold observables emit data only if they have an active observer. By default, Kotlin flows are cold.


1 Answers

collect is a suspending function, the code after collect will only run once the flow completes.

Launch it in a separate coroutine:

private suspend fun myFun() {
   coroutineScope {
       launch {
           viewmodel.getListener()?.collect { myClass->
               //do something here
           }
       }
       withContext(Dispatchers.Main) { updateUI() } //the code never reaches this part
    }
}
like image 72
Francesc Avatar answered Oct 17 '22 03:10

Francesc