I have created an Akka Stream which has a simple Source, Flow and Sink. With this I can easily send elements through it. Now I want to change this stream so that the Flow returns an Option. Depending on the result of the Option I want to change the output of the Flow.

Is it possible to create a construction like this?
Akka streams consist of three major components in it – Source, Flow and Sink. These are described below in detail.
The GraphDSL. create is a curried function. The first argument list is empty second argument is a function. That function takes a mutable data structure called a Builder which is typed with a materialized value, which in our case will be NotUsed, as we aren't surfacing anything outside of the stream.
Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.
As I mentioned, Akka Streams is highly performant and fault-tolerant, but it was built for a different purpose. While in Kafka you used it as a message bus and your application was a client API for the Kafka cluster, in here Akka Streams is an integral part of your application's logic.
Both the answers given at this time involve Broadcast. Please note that it might work in this specific example, but in more complex graphs Broadcast might not be a sensible choice.
The reason is that Broadcast always backpressures if at least one of the downstreams backpressures.
The best backpressure-aware solution is Partition, which is able to selectively propagate backpressure from the branch selected by the Partitioner function.
Example below (elaborating on one of the answer by T-Fowl)
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
Suppose you have something like that
val source = Source(1 to 100)
val flow = Flow[Int].map {
case x if x % 2 == 0 ⇒ Some(x.toString)
case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))
You can make runnable graph with desired structure as following:
val runnable = source
.via(flow)
.alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
.to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
You can view the flow with the 2 sinks as being a sink in itself. To construct more complicated Graph's we can use the functions provided in GraphDSL.
Consider, in a generic case
def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
(sink1, sink2) ⇒ {
import GraphDSL.Implicits._
//Here we broadcast the Some[T] values to 2 flows,
// each filtering to the correct type for each sink
val bcast = builder.add(Broadcast[Option[T]](2))
bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in
//The flow that maps T => Some[T]
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> bcast.in
//The whole thing is a Sink[T]
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
This returns a Sink[T,Mat] that, using the provided function, will map the incoming T elements into an Option[T], which is then directed to one of the provided sinks.
An example of the usage:
val sink = splittingSink(
(s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
Sink.foreach[String](s),
Sink.foreach[None.type](_ ⇒ println("None")),
(f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)
Source(List("One", "Two", "Three", "Four", "Five", "Six"))
.runWith(sink)
.onComplete(_ ⇒ println("----\nDone"))
Output:
None
None
None
Four
Five
None
----
Done
Usage of GraphDSL are discussed further in the documentation section about Stream Graphs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With