I have a list of files. I want:
It felt like this should work: (Scala, akka-streams v2.4.7)
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
But that results in a compile error since FileIO
has a materialized value associated with it, and Source.combine
doesn't support that.
Mapping the materialized value away makes me wonder how file-read errors get handled, but does compile:
val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromPath(f.toPath)
.via(Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true))
.map(bs => bs.utf8String)
.mapMaterializedValue(f => NotUsed.getInstance())
)
val source = sources.reduce( (a, b) => Source.combine(a, b)(MergePreferred(_)) )
source.map(_ => 1).runWith(Sink.reduce[Int](_ + _)) // counting lines
But throws an IllegalArgumentException at runtime:
java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [MergePreferred.out] must correspond to the inlets [MergePreferred.preferred] and outlets [MergePreferred.out]
The code below is not as terse as it could be, in order to clearly modularize the different concerns.
// Given a stream of bytestrings delimited by the system line separator we can get lines represented as Strings
val lines = Framing.delimiter(ByteString(System.lineSeparator), 10000, allowTruncation = true).map(bs => bs.utf8String)
// given as stream of Paths we read those files and count the number of lines
val lineCounter = Flow[Path].flatMapConcat(path => FileIO.fromPath(path).via(lines)).fold(0l)((count, line) => count + 1).toMat(Sink.head)(Keep.right)
// Here's our test data source (replace paths with real paths)
val testFiles = Source(List("somePathToFile1", "somePathToFile2").map(new File(_).toPath))
// Runs the line counter over the test files, returns a Future, which contains the number of lines, which we then print out to the console when it completes
testFiles runWith lineCounter foreach println
Update Oh, I didn't see the accepted answer because I didn't refresh the page >_<. I'll leave this here anyway since I've also added some notes about error handling.
I believe the following program does what you want:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Framing, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
import java.nio.file.Paths
import scala.concurrent.duration._
object TestMain extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit def ec = actorSystem.dispatcher
val sources = Vector("build.sbt", ".gitignore")
.map(Paths.get(_))
.map(p =>
FileIO.fromPath(p)
.viaMat(Framing.delimiter(ByteString(System.lineSeparator()), Int.MaxValue, allowTruncation = true))(Keep.left)
.mapMaterializedValue { f =>
f.onComplete {
case Success(r) if r.wasSuccessful => println(s"Read ${r.count} bytes from $p")
case Success(r) => println(s"Something went wrong when reading $p: ${r.getError}")
case Failure(NonFatal(e)) => println(s"Something went wrong when reading $p: $e")
}
NotUsed
}
)
val finalSource = Source(sources).flatMapConcat(identity)
val result = finalSource.map(_ => 1).runWith(Sink.reduce[Int](_ + _))
result.onComplete {
case Success(n) => println(s"Read $n lines total")
case Failure(e) => println(s"Reading failed: $e")
}
Await.ready(result, 10.seconds)
actorSystem.terminate()
}
The key here is the flatMapConcat()
method: it transforms each element of a stream into a source and returns a stream of elements yielded by these sources if they are run sequentially.
As for handling errors, you can either add a handler to the future in the mapMaterializedValue
argument, or you can handle the final error of the running stream by putting a handler on the Sink.foreach
materialized future value. I did both in the example above, and if you test it, say, on a nonexisting file, you'll see that the same error message will be printed twice. Unfortunately, flatMapConcat()
does not collect materialized values, and frankly I can't see the way it could do it sanely, therefore you have to handle them separately, if necessary.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With