I have a stream like this and two sinks, but only one is used at a time:
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)
or
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)
It is configurable which sink we use, but what if I have use both sinks in parallel. How can I do that?
I thought about Sink.combine, but it requires also a merge strategy and I don't want to combine results of these sinks in any way. I don't really care about them, so I want to only send the same data via HTTP to some endpoint while in the same time send them to database. Sink combine is very similar to broadcast, but implementing a broadcast from the scratch decreases readability of my code where now I have only simple source, flow and a sink, no low-level graph stages.
Do you know a proper way how to do that (having backpressure and other things that I have using only one sink)?
You can use alsoTo
(see API docs):
Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
Broadcasting using GraphDSL
in its simplest form shouldn't decrease readability – in fact, one might even argue that the ~>
clauses in some way help visualize the stream structure:
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[Int](2))
Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
bcast.out(0) ~> sink1
bcast.out(1) ~> sink2
ClosedShape
})
graph.run()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With