I have created an Akka Stream which has a simple Source
, Flow
and Sink
. With this I can easily send elements through it. Now I want to change this stream so that the Flow
returns an Option
. Depending on the result of the Option
I want to change the output of the Flow
.
Is it possible to create a construction like this?
Akka streams consist of three major components in it – Source, Flow and Sink. These are described below in detail.
The GraphDSL. create is a curried function. The first argument list is empty second argument is a function. That function takes a mutable data structure called a Builder which is typed with a materialized value, which in our case will be NotUsed, as we aren't surfacing anything outside of the stream.
Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.
As I mentioned, Akka Streams is highly performant and fault-tolerant, but it was built for a different purpose. While in Kafka you used it as a message bus and your application was a client API for the Kafka cluster, in here Akka Streams is an integral part of your application's logic.
Both the answers given at this time involve Broadcast
. Please note that it might work in this specific example, but in more complex graphs Broadcast
might not be a sensible choice.
The reason is that Broadcast
always backpressures if at least one of the downstreams backpressures.
The best backpressure-aware solution is Partition
, which is able to selectively propagate backpressure from the branch selected by the Partitioner function.
Example below (elaborating on one of the answer by T-Fowl)
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
Suppose you have something like that
val source = Source(1 to 100)
val flow = Flow[Int].map {
case x if x % 2 == 0 ⇒ Some(x.toString)
case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))
You can make runnable graph with desired structure as following:
val runnable = source
.via(flow)
.alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
.to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
You can view the flow with the 2 sinks as being a sink in itself. To construct more complicated Graph's we can use the functions provided in GraphDSL.
Consider, in a generic case
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
//Here we broadcast the Some[T] values to 2 flows,
// each filtering to the correct type for each sink
val bcast = builder.add(Broadcast[Option[T]](2))
bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
//The flow that maps T => Some[T]
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> bcast.in
//The whole thing is a Sink[T]
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
This returns a Sink[T,Mat]
that, using the provided function, will map the incoming T
elements into an Option[T]
, which is then directed to one of the provided sinks.
An example of the usage:
val sink = splittingSink(
(s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
Sink.foreach[String](s),
Sink.foreach[None.type](_ ⇒ println("None")),
(f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)
Source(List("One", "Two", "Three", "Four", "Five", "Six"))
.runWith(sink)
.onComplete(_ ⇒ println("----\nDone"))
Output:
None
None
None
Four
Five
None
----
Done
Usage of GraphDSL are discussed further in the documentation section about Stream Graphs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With