Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple sinks in the same stream

I have a stream like this and two sinks, but only one is used at a time:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)

or

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)

It is configurable which sink we use, but what if I have use both sinks in parallel. How can I do that?

I thought about Sink.combine, but it requires also a merge strategy and I don't want to combine results of these sinks in any way. I don't really care about them, so I want to only send the same data via HTTP to some endpoint while in the same time send them to database. Sink combine is very similar to broadcast, but implementing a broadcast from the scratch decreases readability of my code where now I have only simple source, flow and a sink, no low-level graph stages.

Do you know a proper way how to do that (having backpressure and other things that I have using only one sink)?

like image 741
Piotr Kozlowski Avatar asked Dec 19 '17 22:12

Piotr Kozlowski


2 Answers

You can use alsoTo (see API docs):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
like image 178
Sebastian Avatar answered Nov 09 '22 01:11

Sebastian


Broadcasting using GraphDSL in its simplest form shouldn't decrease readability – in fact, one might even argue that the ~> clauses in some way help visualize the stream structure:

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val bcast = builder.add(Broadcast[Int](2))

  Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
  bcast.out(0) ~> sink1
  bcast.out(1) ~> sink2

  ClosedShape
})
graph.run()
like image 4
Leo C Avatar answered Nov 09 '22 00:11

Leo C