I have a StateFlow
coroutine that is shared amongst various parts of my application. When I cancel
the CoroutineScope
of a downstream collector, a JobCancellationException
is propagated up to the StateFlow
, and it stops emitting values for all current and future collectors.
The StateFlow
:
val songsRelay: Flow<List<Song>> by lazy {
MutableStateFlow<List<Song>?>(null).apply {
CoroutineScope(Dispatchers.IO)
.launch { songDataDao.getAll().distinctUntilChanged().collect { value = it } }
}.filterNotNull()
}
A typical 'presenter' in my code implements the following base class:
abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {
var view: T? = null
private val job by lazy {
Job()
}
private val coroutineScope by lazy { CoroutineScope( job + Dispatchers.Main) }
override fun bindView(view: T) {
this.view = view
}
override fun unbindView() {
job.cancel()
view = null
}
fun launch(block: suspend CoroutineScope.() -> Unit): Job {
return coroutineScope.launch(block = block)
}
}
A BasePresenter
implementation might call launch{ songsRelay.collect {...} }
When the presenter is unbound, in order to prevent leaks, I cancel the parent job. Any time a presenter that was collecting the songsRelay
StateFlow
is unbound, the StateFlow
is essentially terminated with a JobCancellationException
, and no other collectors/presenters can collect values from it.
I've noticed that I can call job.cancelChildren()
instead, and this seems to work (StateFlow
doesn't complete with a JobCancellationException
). But then I wonder what the point is of declaring a parent job
, if I can't cancel the job itself. I could just remove job
altogether, and call coroutineScope.coroutineContext.cancelChildren()
to the same effect.
If I do just call job.cancelChildren()
, is that sufficient? I feel like by not calling coroutineScope.cancel()
, or job.cancel()
, I may not be correctly or completely cleaning up the tasks that I have kicked off.
I also don't understand why the JobCancellationException
is propagated up the hierarchy when job.cancel()
is called. Isn't job
the 'parent' here? Why does cancelling it affect my StateFlow
?
UPDATE:
Are you sure your songRelay
is actually getting cancelled for all presenters? I ran this test and "Song relay completed" is printed, because onCompletion
also catches downstream exceptions. However Presenter 2 emits the value 2 just fine, AFTER song relay prints "completed". If I cancel Presenter 2, "Song relay completed" is printed again with a JobCancellationException for Presenter 2's job.
I do find it interesting how the one flow instance will emit once each for each collector subscribed. I didn't realize that about flows.
val songsRelay: Flow<Int> by lazy {
MutableStateFlow<Int?>(null).apply {
CoroutineScope(Dispatchers.IO)
.launch {
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}.onCompletion {
println("Dao completed")
}.collect { value = it }
}
}.filterNotNull()
.onCompletion { cause ->
println("Song relay completed: $cause")
}
}
@Test
fun test() = runBlocking {
val job = Job()
val presenterScope1 = CoroutineScope(job + Dispatchers.Unconfined)
val presenterScope2 = CoroutineScope(Job() + Dispatchers.Unconfined)
presenterScope1.launch {
songsRelay.onCompletion { cause ->
println("Presenter 1 Completed: $cause")
}.collect {
println("Presenter 1 emits: $it")
}
}
presenterScope2.launch {
songsRelay.collect {
println("Presenter 2 emits: $it")
}
}
presenterScope1.cancel()
delay(2000)
println("Done test")
}
I think you need to use SupervisorJob
in your BasePresenter instead of Job
. In general using Job
would be a mistake for the whole presenter, because one failed coroutine will cancel all coroutines in the Presenter. Generally not what you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With