I'm processing a hot stream of events, arriving by callback. 'Downstream' I'd like to split it into multiple streams, and process them.The events all arrive sequentially from a single thread (which I don't control, so I don't think I can use co routines here) What is the right structure to use here?
I can create a Flow pretty easily, using callbackFlow and sendBlocking, but the semantics don't seem to line up, as the Flow isn't cold. What is the best way to split a flow into multiple downstream flows (depending on the contents of events). Or should I use channels? It matches the 'hotness' of my source, but the whole polling downstream seems off (in this basically synchronoussituation), and a lot of the methods seem deprecated in favor of Flow.
I can do all this by just using 'callbacks all the way' but that creates a lot tighter coupling than I'd like. Any ideas?
Edit:
I ended up with this, seems to work:
fun testFlow() {
runBlocking {
val original = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")
val broadcast = original.broadcastIn(this)
val flow1 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 4 }
val flow2 = broadcast.openSubscription().receiveAsFlow().filter { it.length == 3 }
flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }
}
}
There will soon be a hot SharedFlow
for this use case, but in the meantime you can use a BroadcastChannel
under the cover.
You can start with callbackFlow
to create a cold flow from a callback based API (see Roman Elizarov's post about it).
Then use the following to make it hot and share it:
val original: Flow<String> = TODO("get original flow")
// create an implicit hot BroadcastChannel, shared between collectors
val sharedFlow = original.broadcastIn(scope).asFlow()
// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }
flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }
First to clarify, even if Flow
s are mostly cold for now, there is already a hot StateFlow
, and there will soon be a convenient share
operator and a hot SharedFlow
to simplify this kind of use case.
While we wait for this, if you initially have a cold Flow
, you currently have to first create a hot channel (and a coroutine to send elements to it) from which we derive flows sharing the hot source. This can easily be done in one of these ways:
Flow.produceIn(scope)
launches a coroutine in the given scope and gives you a ReceiveChannel
(useful for fan-out, see below)Flow.broadcastIn(scope)
launches a coroutine in the given scope and gives you a BroadcastChannel
(useful for actual sharing, see below)Once you have a hot channel, you can convert it into a flow and get different behaviours:
ReceiveChannel.consumeAsFlow()
creates a Flow
from a hot source, but it can only be collect
-ed by a single collector (throws otherwise)ReceiveChannel.receiveAsFlow()
creates a multi-collector Flow
, but it behaves in a fan-out fashion (each element form the source channel only goes to one consumer)BroadcastChannel.asFlow()
creates a multi-collector Flow
where each collector gets all elements (which is effectively sharing). Calling collect
creates a new subscription on the BroadcastChannel
, and handles cancellation properly.StateFlow
This is not your use case, but sometimes you may not want necessarily all values in a flow, but rather the latest current state and state updates.
This used to be done via a ConflatedBroadcastChannel
, but you can now use a StateFlow
to represent this (since coroutines 1.3.6):
MutableStateFlow
equal
ity).If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With