Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Streams - How to keep materialized value of an auxiliary Sink in a Graph

I have a function returning a Flow whose logic involves passing some elements of the graph to an auxiliary Sink passed as a parameter. I want to retain the auxiliary Sink's materialized value so I'm able to act upon its value when the constructed stream is launched.

Here's a rough picture of the flow I'm building:

IN ~> (logic: In => Out) ~> Broadcast ~> AuxFilter ~> AuxSink
                                      ~> OutFilter ~> OUT

Sample code:

case class Incoming()
trait Element
case class Outcoming() extends Element
case class Persistent() extends Element

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, NotUsed] = {
  val isPersistent = Flow[Element].collect {
    case persistent: Persistent => persistent
  }

  val isRunning = Flow[Element].collect {
    case out: Outcoming => out
  }

  val magicFlow: Flow[Incoming, Element, NotUsed] = Flow[Incoming]
    .map(_ => if (Random.nextBoolean()) Outcoming() else Persistent())

  Flow.fromGraph {
    GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

                   bcast.out(0) ~> isPersistent ~> auxSink
      magic.out ~> bcast.in
                   bcast.out(1) ~> isRunning ~> sink.in

      FlowShape(magic.in, sink.out)
    }
  }
}

Is there a way to somehow pass the auxSink's Mat to the resulting Flow?

Thanks.

like image 926
Sergey Avatar asked Oct 21 '16 18:10

Sergey


People also ask

What is a materialized value Akka?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

What is backpressure in Akka?

In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand. This protocol (backpressure) slows down the producers and ensures no data is lost.

What is sink in Akka?

Source[String,akka. NotUsed] = ... Sink: This is the exit point of your stream. There must be at least one in every stream.The Sink is the last element of our Stream . Basically it's a subscriber of the data sent/processed by a Source .

Can you describe what are 3 main components of Akka streams?

Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.


1 Answers

Answering my own question...

Found it! The source of Flow.alsoToMat pointed me to exactly the logic I needed - to access the materialized value of an auxiliary graph (in my case auxSink), one has to import its shape into the graph being constructed by passing it as a parameter to GraphDSL.create().

def flow[Mat](auxSink: Sink[Persistent, Mat]): Flow[Incoming, Outcoming, Mat] = {
  val isPersistent = ...
  val isRunning = ...
  val magicFlow = ...

  Flow.fromGraph {
    GraphDSL.create(auxSink) { implicit b => aux =>
      import GraphDSL.Implicits._

      val magic = b.add(magicFlow)
      val bcast = b.add(Broadcast[Element](2))
      val sink = b.add(isRunning)

      magic ~> bcast ~> isPersistent ~> aux
               bcast ~> isRunning ~> sink

      FlowShape(magic.in, sink.out)
    }
  }
}
like image 175
Sergey Avatar answered Oct 21 '22 00:10

Sergey