Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to assemble an Akka Streams sink from multiple file writes?

I'm trying to integrate an akka streams based flow in to my Play 2.5 app. The idea is that you can stream in a photo, then have it written to disk as the raw file, a thumbnailed version and a watermarked version.

I managed to get this working using a graph something like this:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
                                    .map(_.result().toArray)

def toByteArray = Flow[ByteString].map(b => b.toArray)

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
  import GraphDSL.Implicits._
  val streamFan = builder.add(Broadcast[ByteString](3))
  val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
  val output = builder.add(Flow[ByteString].map(x => Success(Done)))

  val rawFileSink = FileIO.toFile(file)
  val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
  val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

  streamFan.out(0) ~> rawFileSink
  streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
  streamFan.out(2) ~> output.in

  byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
  byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink

  FlowShape(streamFan.in, output.out)
})

graph

}

Then I wire it in to my play controller using an accumulator like this:

val sink = Sink.head[Try[Done]]

val photoStorageParser = BodyParser { req =>
     Accumulator(sink).through(graph).map(Right.apply)
}

The problem is that my two processed file sinks aren't completing and I'm getting zero sizes for both processed files, but not the raw one. My theory is that the accumulator is only waiting on one of the outputs of my fan out, so when the input stream completes and my byteAccumulator spits out the complete file, by the time the processing is finished play has got the materialized value from the output.

So, my questions are:
Am I on the right track with this as far as my approach goes? What is the expected behaviour for running a graph like this? How can I bring all my sinks together to form one final sink?

like image 285
Tompey Avatar asked May 10 '16 07:05

Tompey


People also ask

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous. Non-Blocking.

What is GraphDSL?

The GraphDSL. create is a curried function. The first argument list is empty second argument is a function. That function takes a mutable data structure called a Builder which is typed with a materialized value, which in our case will be NotUsed, as we aren't surfacing anything outside of the stream.


1 Answers

Ok, after a little help (Andreas was on the right track), I've arrived at this solution which does the trick:

val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
  implicit builder => (rawSink, thumbSink, waterSink) => {
    val streamFan = builder.add(Broadcast[ByteString](2))
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))

    streamFan.out(0) ~> rawSink
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink

    SinkShape(streamFan.in)
  }
})

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))

After which it's dead easy to call this from Play:

val photoStorageParser = BodyParser { req =>
  Accumulator(theSink).map(Right.apply)
}

def createImage(path: String) = Action(photoStorageParser) { req =>
  Created
}
like image 120
Tompey Avatar answered Oct 20 '22 01:10

Tompey