Let's take a very simple case:
Source(1 to 10)
.alsoTo(Sink.foreach(v => println(s"each: $v")))
.toMat(Sink.head)(Keep.right)
.run()
According to alsoTo documentation, I would expect Sink.foreach to print all the elements, however, it only prints first. Same happens if I switch Sink.foreach and Sink.head places.
If broadcast is implemented via GraphDSL,however , the entire source is consumed even if one of the sinks is Sink.head.
EDIT:
Documentation for alsoTo states the following:
Attaches the given Sink to this Flow, meaning that elements that pass through this Flow will also be sent to the Sink.
To me this sounds like a broadcast, but maybe this is where I make the mistake. I could also interpret that toMat controls the flow. So, I would expect the following to take all elements from the source:
Source(1 to 10)
.alsoTo(Sink.head)
.toMat(Sink.seq)(Keep.right)
.run()
GraphDSL version works as I would expect:
val s1 = Sink.foreach[Int](v => println(s"each: $v"))
val s2 = Sink.head[Int]
val source = Source(1 to 10)
RunnableGraph.fromGraph(GraphDSL.create(s1, s2)((_, _)) { implicit builder => (s1, s2) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
source ~> broadcast.in
broadcast.out(0) ~> s1
broadcast.out(1) ~> s2
ClosedShape
}).run()
The reason is that Sink.head consumes single element and completes itself. This is propagated upstream in the form of a cancel and no elements are sent from the Source after this.
Code from akka.stream.impl.HeadOptionStage.onPush shows it
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
Where completeStage
Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, then marks the operator as stopped.
Update
alsoTo is a broadcast that is configured with these parameters:
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
Your GraphDSL version works differently because by default broadcast is eagerCancel = false.
Where eagerCancel
if true, broadcast cancels upstream if any of its downstreams cancel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With