Update Coroutines 1.3.0-RC
Working version:
@FlowPreview suspend fun streamTest(): Flow<String> = channelFlow { listener.onSomeResult { result -> if (!isClosedForSend) { offer(result) } } awaitClose { listener.unsubscribe() } }
Also checkout this Medium article by Roman Elizarov: Callbacks and Kotlin Flows
Original Question
I have a Flow emitting multiple Strings:
@FlowPreview suspend fun streamTest(): Flow<String> = flowViaChannel { channel -> listener.onSomeResult { result -> if (!channel.isClosedForSend) { channel.sendBlocking(result) } } }
After some time I want to unsubscribe from the stream. Currently I do the following:
viewModelScope.launch { beaconService.streamTest().collect { Timber.i("stream value $it") if(it == "someString") // Here the coroutine gets canceled, but streamTest is still executed this.cancel() } }
If the coroutine gets canceled, the stream is still executed. There is just no subscriber listening to new values. How can I unsubscribe and stop the stream
function?
A solution is not to cancel the flow, but the scope it's launched in.
val job = scope.launch { flow.cancellable().collect { } } job.cancel()
NOTE: You should call cancellable()
before collect
if you want your collector stop when Job
is canceled.
You could use the takeWhile
operator on Flow.
flow.takeWhile { it != "someString" }.collect { emittedValue -> //Do stuff until predicate is false }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With