Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka Streams: How do I get Materialized Sink output from GraphDSL API?

This is a really simple, newbie question using the GraphDSL API. I read several related SO threads and I don't see the answer:

val actorSystem = ActorSystem("QuickStart")
val executor = actorSystem.dispatcher
val materializer = ActorMaterializer()(actorSystem)

val source: Source[Int, NotUsed] = Source(1 to 5)
val throttledSource = source.throttle(1, 1.second, 1, ThrottleMode.shaping)
val intDoublerFlow = Flow.fromFunction[Int, Int](i => i * 2)
val sink = Sink.foreach(println)

val graphModel = GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  throttledSource ~> intDoublerFlow ~> sink

  // I presume I want to change this shape to something else
  // but I can't figure out what it is.
  ClosedShape
}
// TODO: This is RunnableGraph[NotUsed], I want RunnableGraph[Future[Done]] that gives the
// materialized Future[Done] from the sink. I presume I need to use a GraphDSL SourceShape
// but I can't get that working.
val graph = RunnableGraph.fromGraph(graphModel)

// This works and gives me the materialized sink output using the simpler API.
// But I want to use the GraphDSL so that I can add branches or junctures.
val graphThatIWantFromDslAPI = throttledSource.toMat(sink)(Keep.right)
like image 228
clay Avatar asked Feb 23 '17 21:02

clay


2 Answers

The trick is to pass the stage you want the materialized value of (in your case, sink) to the GraphDSL.create. The function you pass as a second parameter changes as well, needing a Shape input parameter (s in the example below) which you can use in your graph.

  val graphModel: Graph[ClosedShape, Future[Done]] = GraphDSL.create(sink) { implicit b => s =>
    import GraphDSL.Implicits._

    throttledSource ~> intDoublerFlow ~> s

    // ClosedShape is just fine - it is always the shape of a RunnableGraph
    ClosedShape
  }
  val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)

More info can be found in the docs.

like image 155
Stefano Bonetti Avatar answered Nov 14 '22 09:11

Stefano Bonetti


val graphModel = GraphDSL.create(sink) { implicit b: Builder[Future[Done]] => sink =>
  import akka.stream.scaladsl.GraphDSL.Implicits._

  throttledSource ~> intDoublerFlow ~> sink

  ClosedShape
}
val graph: RunnableGraph[Future[Done]] = RunnableGraph.fromGraph(graphModel)    
val graphThatIWantFromDslAPI: RunnableGraph[Future[Done]] = throttledSource.toMat(sink)(Keep.right)

The problem with the GraphDSL API is, that the implicit Builder is heavily overloaded. You need to wrap your sink in create, which turns the Builder[NotUsed] into Builder[Future[Done]] and represents now a function from builder => sink => shape instead of builder => shape.

like image 35
Martin Seeler Avatar answered Nov 14 '22 07:11

Martin Seeler