Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka streams: Reading multiple files

I have a list of files. I want:

  1. To read from all of them as a single Source.
  2. Files should be read sequentially, in-order. (no round-robin)
  3. At no point should any file be required to be entirely in memory.
  4. An error reading from a file should collapse the stream.

It felt like this should work: (Scala, akka-streams v2.4.7)

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines

But that results in a compile error since FileIO has a materialized value associated with it, and Source.combine doesn't support that.

Mapping the materialized value away makes me wonder how file-read errors get handled, but does compile:

val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
    .via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
    .map(bs => bs.utf8String)
    .mapMaterializedValue(f => NotUsed.getInstance())
  )
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _))  // counting lines

But throws an IllegalArgumentException at runtime:

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
like image 479
randomstatistic Avatar asked Jun 13 '16 22:06

randomstatistic


2 Answers

The code below is not as terse as it could be, in order to clearly modularize the different concerns.

// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)

// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)

// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))

// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
like image 133
Viktor Klang Avatar answered Sep 21 '22 14:09

Viktor Klang


Update Oh, I didn't see the accepted answer because I didn't refresh the page >_<. I'll leave this here anyway since I've also added some notes about error handling.

I believe the following program does what you want:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._

object TestMain extends App {
  implicit val actorSystem = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  implicit def ec = actorSystem.dispatcher

  val sources = Vector("build.sbt", ".gitignore")
    .map(Paths.get(_))
    .map(p =>
      FileIO.fromPath(p)
        .viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
        .mapMaterializedValue { f =>
          f.onComplete {
            case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
            case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
            case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
          }
          NotUsed
        }
    )
  val finalSource = Source(sources).flatMapConcat(identity)

  val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
  result.onComplete {
    case Success(n) => println(s"Read $n lines total")
    case Failure(e) => println(s"Reading failed: $e")
  }
  Await.ready(result, 10.seconds)

  actorSystem.terminate()
}

The key here is the flatMapConcat() method: it transforms each element of a stream into a source and returns a stream of elements yielded by these sources if they are run sequentially.

As for handling errors, you can either add a handler to the future in the mapMaterializedValue argument, or you can handle the final error of the running stream by putting a handler on the Sink.foreach materialized future value. I did both in the example above, and if you test it, say, on a nonexisting file, you'll see that the same error message will be printed twice. Unfortunately, flatMapConcat() does not collect materialized values, and frankly I can't see the way it could do it sanely, therefore you have to handle them separately, if necessary.

like image 37
Vladimir Matveev Avatar answered Sep 19 '22 14:09

Vladimir Matveev