Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you split a 'hot' stream of events from a callback in Kotlin?

I'm processing a hot stream of events, arriving by callback. 'Downstream' I'd like to split it into multiple streams, and process them.The events all arrive sequentially from a single thread (which I don't control, so I don't think I can use co routines here) What is the right structure to use here?

I can create a Flow pretty easily, using callbackFlow and sendBlocking, but the semantics don't seem to line up, as the Flow isn't cold. What is the best way to split a flow into multiple downstream flows (depending on the contents of events). Or should I use channels? It matches the 'hotness' of my source, but the whole polling downstream seems off (in this basically synchronoussituation), and a lot of the methods seem deprecated in favor of Flow.

I can do all this by just using 'callbacks all the way' but that creates a lot tighter coupling than I'd like. Any ideas?

Edit:

I ended up with this, seems to work:

    fun testFlow() {
        runBlocking {
            val original = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")
            val broadcast = original.broadcastIn(this)
            val flow1 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 4 }
            val flow2 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 3 }
            flow1.collect { it -> println("Four letter: ${it}") }
            flow2.collect { it -> println("Three letter: ${it}") }
        }
    }
like image 268
Frank Lee Avatar asked Dec 05 '22 08:12

Frank Lee


1 Answers

Short answer

There will soon be a hot SharedFlow for this use case, but in the meantime you can use a BroadcastChannel under the cover.

You can start with callbackFlow to create a cold flow from a callback based API (see Roman Elizarov's post about it). Then use the following to make it hot and share it:

val original: Flow<String> = TODO("get original flow")

// create an implicit hot BroadcastChannel, shared between collectors
val sharedFlow = original.broadcastIn(scope).asFlow()

// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }

flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }

Making a flow hot (the current way)

First to clarify, even if Flows are mostly cold for now, there is already a hot StateFlow, and there will soon be a convenient share operator and a hot SharedFlow to simplify this kind of use case.

While we wait for this, if you initially have a cold Flow, you currently have to first create a hot channel (and a coroutine to send elements to it) from which we derive flows sharing the hot source. This can easily be done in one of these ways:

  • Flow.produceIn(scope) launches a coroutine in the given scope and gives you a ReceiveChannel (useful for fan-out, see below)
  • Flow.broadcastIn(scope) launches a coroutine in the given scope and gives you a BroadcastChannel (useful for actual sharing, see below)

Once you have a hot channel, you can convert it into a flow and get different behaviours:

  • ReceiveChannel.consumeAsFlow() creates a Flow from a hot source, but it can only be collect-ed by a single collector (throws otherwise)
  • ReceiveChannel.receiveAsFlow() creates a multi-collector Flow, but it behaves in a fan-out fashion (each element form the source channel only goes to one consumer)
  • BroadcastChannel.asFlow() creates a multi-collector Flow where each collector gets all elements (which is effectively sharing). Calling collect creates a new subscription on the BroadcastChannel, and handles cancellation properly.

"Latest state" semantics with StateFlow

This is not your use case, but sometimes you may not want necessarily all values in a flow, but rather the latest current state and state updates.

This used to be done via a ConflatedBroadcastChannel, but you can now use a StateFlow to represent this (since coroutines 1.3.6):

  • on the producer side, set the value of a MutableStateFlow
  • on the consumer side, each collector will get the current state when they start, and then get a new state value each time it is different from the previous one (based on equality).
like image 103
Joffrey Avatar answered Jan 11 '23 03:01

Joffrey