I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyAkkaSystem")
implicit val materializer = ActorMaterializer()
val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
val sink = Sink.foreach(println)
source.runWith(sink)
}
The two Source
types don't sit easy with me. Is this idiomatic or is there is a better way to write this?
Actually, akka-streams
provides a function to directly read from a file.
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.runForeach(println)
Here, runForeach
method is to print the lines. If you have a proper Sink
to process these lines, use it instead of this function. For example, if you want to split the lines by '
and print the total number of words in it:
val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))
FileIO.fromPath(Paths.get("a.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.to(sink)
.run()
The idiomatic way to read a CSV file with Akka Streams is to use the Alpakka CSV connector. The following example reads a CSV file, converts it to a map of column names (assumed to be the first line in the file) and ByteString
values, transforms the ByteString
values to String
values, and prints each line:
FileIO.fromPath(Paths.get("a.csv"))
.via(CsvParsing.lineScanner())
.via(CsvToMap.toMap())
.map(_.mapValues(_.utf8String))
.runForeach(println)
Try this:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
object ReadStreamApp extends App {
implicit val actorSystem = ActorSystem()
import actorSystem.dispatcher
implicit val flowMaterializer = ActorMaterializer()
val logFile = Paths.get("src/main/resources/a.csv")
val source = FileIO.fromPath(logFile)
val flow = Framing
.delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
.map(_.utf8String)
val sink = Sink.foreach(println)
source
.via(flow)
.runWith(sink)
.andThen {
case _ =>
actorSystem.terminate()
Await.ready(actorSystem.whenTerminated, 1 minute)
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With