Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you deal with futures in Akka Flow?

I have built an akka graph that defines a flow. My objective is to reformat my future response and save it to a file. The flow can be outlined bellow:

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._
      val balancer = builder.add(Balance[(HttpRequest, String)](6, waitForAllDownstreams = false))
      val merger = builder.add(Merge[Future[Map[String, String]]](6))
      val fileSink = FileIO.toPath(outputPath, options)
      val ignoreSink = Sink.ignore
      val in = Source(seeds)
      in ~> balancer.in
      for (i <- Range(0,6)) {
        balancer.out(i) ~>
          wikiFlow.async ~>
          // This maps to a Future[Map[String, String]]
          Flow[(Try[HttpResponse], String)].map(parseHtml) ~>
          merger
      }

      merger.out ~>
      // When we merge we need to map our Map to a file
      Flow[Future[Map[String, String]]].map((d) => {
        // What is the proper way of serializing future map
        // so I can work with it like a normal stream into fileSink?

        // I could manually do ->
        // d.foreach(someWriteToFileProcess(_))
        // with ignoreSink, but this defeats the nice
        // akka flow
      }) ~>
      fileSink

      ClosedShape
    })

I can hack this workflow to write my future map to a file via foreach, but I'm afraid this could somehow lead to concurrency issues with FileIO and it just doesn't feel right. What is the proper way to handle futures with our akka flow?

like image 359
Dr.Knowitall Avatar asked Feb 06 '17 03:02

Dr.Knowitall


1 Answers

The easiest way to create a Flow which involves an asynchronous computation is by using mapAsync.

So... lets say you want to create a Flow which consumes Int and produces String using an asynchronous computation mapper: Int => Future[String] with a parallelism of 5.

val mapper: Int => Future[String] = (i: Int) => Future(i.toString)

val yourFlow = Flow[Int].mapAsync[String](5)(mapper)

Now, you can use this flow in your graph however you want.

An example usage will be,

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val intSource = Source(1 to 10)

  val printSink = Sink.foreach[String](s => println(s))

  val yourMapper: Int => Future[String] = (i: Int) => Future(i.toString)

  val yourFlow = Flow[Int].mapAsync[String](2)(yourMapper)

  intSource ~> yourFlow ~> printSink

  ClosedShape
}
like image 173
sarveshseri Avatar answered Nov 13 '22 18:11

sarveshseri