Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sink for line-by-line file IO with backpressure

I have a file processing job that currently uses akka actors with manually managed backpressure to handle the processing pipeline, but I've never been able to successfully manage the backpressure at the input file reading stage.

This job takes an input file and groups lines by an ID number present at the start of each line, and then once it hits a line with a new ID number, it pushes the grouped lines to a processing actor via message, and then continues with the new ID number, all the way until it reaches the end of the file.

This seems like it would be a good use case for Akka Streams, using the File as a sink, but I'm still not sure of three things:

1) How can I read the file line by line?

2) How can I group by the ID present on every line? I currently use very imperative processing for this, and I don't think I'll have the same ability in a stream pipeline.

3) How can I apply backpressure, such that I don't keep reading lines into memory faster than I can process the data downstream?

like image 416
TheDarkSaint Avatar asked Apr 15 '16 21:04

TheDarkSaint


1 Answers

Akka streams' groupBy is one approach. But groupBy has a maxSubstreams param which would require that you to know that max # of ID ranges up front. So: the solution below uses scan to identify same-ID blocks, and splitWhen to split into substreams:

object Main extends App {
  implicit val system = ActorSystem("system")
  implicit val materializer = ActorMaterializer()

  def extractId(s: String) = {
    val a = s.split(",")
    a(0) -> a(1)
  }

  val file = new File("/tmp/example.csv")

  private val lineByLineSource = FileIO.fromFile(file)
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
    .map(_.utf8String)

  val future: Future[Done] = lineByLineSource
    .map(extractId)
    .scan( (false,"","") )( (l,r) => (l._2 != r._1, r._1, r._2) )
    .drop(1)
    .splitWhen(_._1)
    .fold( ("",Seq[String]()) )( (l,r) => (r._2, l._2 ++ Seq(r._3) ))
    .concatSubstreams
    .runForeach(println)

  private val reply = Await.result(future, 10 seconds)
  println(s"Received $reply")
  Await.ready(system.terminate(), 10 seconds)
}

extractId splits lines into id -> data tuples. scan prepends id -> data tuples with a start-of-ID-range flag. The drop drops the primer element to scan. splitwhen starts a new substream for each start-of-range. fold concatenates substreams to lists and removes the start-of-ID-range boolean, so that each substream produces a single element. In place of the fold you probably want a custom SubFlow which processes a streams of rows for a single ID and emits some result for the ID range. concatSubstreams merges the per-ID-range substreams produced by splitWhen back into a single stream that's printed by runForEach .

Run with:

$ cat /tmp/example.csv
ID1,some input
ID1,some more input
ID1,last of ID1
ID2,one line of ID2
ID3,2nd before eof
ID3,eof

Output is:

(ID1,List(some input, some more input, last of ID1))
(ID2,List(one line of ID2))
(ID3,List(2nd before eof, eof))
like image 93
tariksbl Avatar answered Nov 03 '22 19:11

tariksbl