Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Stream Option output

I have created an Akka Stream which has a simple Source, Flow and Sink. With this I can easily send elements through it. Now I want to change this stream so that the Flow returns an Option. Depending on the result of the Option I want to change the output of the Flow.

enter image description here

Is it possible to create a construction like this?

like image 652
RemcoW Avatar asked Dec 06 '16 09:12

RemcoW


People also ask

Can you describe what are 3 main components of Akka streams?

Akka streams consist of three major components in it – Source, Flow and Sink. These are described below in detail.

What is GraphDSL?

The GraphDSL. create is a curried function. The first argument list is empty second argument is a function. That function takes a mutable data structure called a Builder which is typed with a materialized value, which in our case will be NotUsed, as we aren't surfacing anything outside of the stream.

How does Akka stream work?

Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.

Does Kafka use Akka?

As I mentioned, Akka Streams is highly performant and fault-tolerant, but it was built for a different purpose. While in Kafka you used it as a message bus and your application was a client API for the Kafka cluster, in here Akka Streams is an integral part of your application's logic.


3 Answers

Both the answers given at this time involve Broadcast. Please note that it might work in this specific example, but in more complex graphs Broadcast might not be a sensible choice. The reason is that Broadcast always backpressures if at least one of the downstreams backpressures. The best backpressure-aware solution is Partition, which is able to selectively propagate backpressure from the branch selected by the Partitioner function.

Example below (elaborating on one of the answer by T-Fowl)

  def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
    val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
      (sink1, sink2) ⇒ {
        import GraphDSL.Implicits._

        def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
        val partition = builder.add(Partition[Option[T]](2, partitioner))
        partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
        partition.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in

        val mapper = builder.add(Flow.fromFunction(f))
        mapper.out ~> partition.in

        SinkShape(mapper.in)
      }
    }
    Sink.fromGraph(graph)
  }
like image 132
Stefano Bonetti Avatar answered Oct 18 '22 10:10

Stefano Bonetti


Suppose you have something like that

val source = Source(1 to 100)
val flow = Flow[Int].map {
  case x if x % 2 == 0 ⇒ Some(x.toString)
  case _ ⇒ None
}
val sink1 = Sink.foreach[String](println)
val sink2 = Sink.foreach[None.type](x ⇒ println("dropped element"))

You can make runnable graph with desired structure as following:

val runnable = source
  .via(flow)
  .alsoTo(Flow[Option[String]].collect { case None ⇒ None }.to(sink2))
  .to(Flow[Option[String]].collect { case Some(x) ⇒ x }.to(sink1))
like image 4
Odomontois Avatar answered Oct 18 '22 11:10

Odomontois


You can view the flow with the 2 sinks as being a sink in itself. To construct more complicated Graph's we can use the functions provided in GraphDSL.

Consider, in a generic case

def splittingSink[T, M1, M2, Mat](f: T ⇒ Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ⇒ Mat): Sink[T, Mat] = {
    val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ⇒
        (sink1, sink2) ⇒ {
            import GraphDSL.Implicits._

            //Here we broadcast the Some[T] values to 2 flows,
            // each filtering to the correct type for each sink
            val bcast = builder.add(Broadcast[Option[T]](2))
            bcast.out(0) ~> Flow[Option[T]].collect { case Some(t) ⇒ t } ~> sink1.in
            bcast.out(1) ~> Flow[Option[T]].collect { case None ⇒ None } ~> sink2.in

            //The flow that maps T => Some[T]
            val mapper = builder.add(Flow.fromFunction(f))
            mapper.out ~> bcast.in

            //The whole thing is a Sink[T]
            SinkShape(mapper.in)
        }
    }
    Sink.fromGraph(graph)
}

This returns a Sink[T,Mat] that, using the provided function, will map the incoming T elements into an Option[T], which is then directed to one of the provided sinks.

An example of the usage:

val sink = splittingSink(
    (s: String) ⇒ if (s.length % 2 == 0) Some(s) else None,
    Sink.foreach[String](s),
    Sink.foreach[None.type](_ ⇒ println("None")),
    (f1: Future[_], f2: Future[_]) ⇒ Future.sequence(Seq(f1, f2)).map(_ ⇒ Done)
)

Source(List("One", "Two", "Three", "Four", "Five", "Six"))
        .runWith(sink)
        .onComplete(_ ⇒ println("----\nDone"))

Output:

None
None
None
Four
Five
None
----
Done

Usage of GraphDSL are discussed further in the documentation section about Stream Graphs.

like image 3
T-Fowl Avatar answered Oct 18 '22 10:10

T-Fowl