I have a function returning a Flow
whose logic involves passing some elements of the graph to an auxiliary Sink
passed as a parameter. I want to retain the auxiliary Sink
's materialized value so I'm able to act upon its value when the constructed stream is launched.
Here's a rough picture of the flow I'm building:
IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
~> OutFilter ~> OUT
Sample code:
case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
val isPersistent = Flow[Element].collect {
case persistent: Persistent => persistent
}
val isRunning = Flow[Element].collect {
case out: Outcoming => out
}
val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
.map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())
Flow.fromGraph {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
bcast.out(0) ~> isPersistent ~> auxSink
magic.out ~> bcast.in
bcast.out(1) ~> isRunning ~> sink.in
FlowShape(magic.in, sink.out)
}
}
}
Is there a way to somehow pass the auxSink
's Mat
to the resulting Flow
?
Thanks.
The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.
In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand. This protocol (backpressure) slows down the producers and ensures no data is lost.
Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.
Answering my own question...
Found it! The source of Flow.alsoToMat
pointed me to exactly the logic I needed - to access the materialized value of an auxiliary graph (in my case auxSink
), one has to import its shape into the graph being constructed by passing it as a parameter to GraphDSL.create()
.
def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
val isPersistent = ...
val isRunning = ...
val magicFlow = ...
Flow.fromGraph {
GraphDSL.create(auxSink) { implicit b => aux =>
import GraphDSL.Implicits._
val magic = b.add(magicFlow)
val bcast = b.add(Broadcast[Element](2))
val sink = b.add(isRunning)
magic ~> bcast ~> isPersistent ~> aux
bcast ~> isRunning ~> sink
FlowShape(magic.in, sink.out)
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With