Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Flow: How to unsubscribe/stop

Update Coroutines 1.3.0-RC

Working version:

@FlowPreview suspend fun streamTest(): Flow<String> = channelFlow {     listener.onSomeResult { result ->         if (!isClosedForSend) {             offer(result)         }     }      awaitClose {         listener.unsubscribe()     } } 

Also checkout this Medium article by Roman Elizarov: Callbacks and Kotlin Flows

Original Question

I have a Flow emitting multiple Strings:

@FlowPreview suspend fun streamTest(): Flow<String> = flowViaChannel { channel ->     listener.onSomeResult { result ->             if (!channel.isClosedForSend) {                 channel.sendBlocking(result)             }     } } 

After some time I want to unsubscribe from the stream. Currently I do the following:

viewModelScope.launch {     beaconService.streamTest().collect {         Timber.i("stream value $it")         if(it == "someString")             // Here the coroutine gets canceled, but streamTest is still executed             this.cancel()      } } 

If the coroutine gets canceled, the stream is still executed. There is just no subscriber listening to new values. How can I unsubscribe and stop the stream function?

like image 913
devz Avatar asked Jul 18 '19 06:07

devz


2 Answers

A solution is not to cancel the flow, but the scope it's launched in.

val job = scope.launch { flow.cancellable().collect { } } job.cancel() 

NOTE: You should call cancellable() before collect if you want your collector stop when Job is canceled.

like image 190
Cristan Avatar answered Sep 16 '22 14:09

Cristan


You could use the takeWhile operator on Flow.

flow.takeWhile { it != "someString" }.collect { emittedValue ->          //Do stuff until predicate is false          } 
like image 33
Ronald Avatar answered Sep 17 '22 14:09

Ronald