I have the following simple case class hierarchy:
sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message
And I have a Flow[Message, Message, NotUsed]
(from a Websocket-based protocol with codec already in place).
I want to demultiplex this Flow[Message]
into separate flows for Foo and Baz types, as those are processed by completely different pathways.
What is the simplest way of doing that? Should be obvious, but I am missing something...
One way is to use create a RunnableGraph that includes the Flows for each type of message.
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(...) // Some message source
val out = Sink.ignore
val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
val partition = builder.add(Partition[Message](2, {
case Foo(_) => 0
case Baz(_) => 1
}))
partition ~> foo ~> // other Flow[Foo] here ~> out
partition ~> baz ~> // other Flow[Baz] here ~> out
ClosedShape
}
g.run()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With