Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading a CSV files using Akka Streams

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

The two Source types don't sit easy with me. Is this idiomatic or is there is a better way to write this?

like image 672
M.K. Avatar asked Oct 24 '16 17:10

M.K.


3 Answers

Actually, akka-streams provides a function to directly read from a file.

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

Here, runForeach method is to print the lines. If you have a proper Sink to process these lines, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()
like image 143
fcat Avatar answered Oct 21 '22 05:10

fcat


The idiomatic way to read a CSV file with Akka Streams is to use the Alpakka CSV connector. The following example reads a CSV file, converts it to a map of column names (assumed to be the first line in the file) and ByteString values, transforms the ByteString values to String values, and prints each line:

FileIO.fromPath(Paths.get("a.csv"))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMap())
  .map(_.mapValues(_.utf8String))
  .runForeach(println)
like image 16
Jeffrey Chung Avatar answered Oct 21 '22 05:10

Jeffrey Chung


Try this:

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

object ReadStreamApp extends App {
  implicit val actorSystem = ActorSystem()
  import actorSystem.dispatcher
  implicit val flowMaterializer = ActorMaterializer()

  val logFile = Paths.get("src/main/resources/a.csv")

  val source = FileIO.fromPath(logFile)

  val flow = Framing
    .delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
    .map(_.utf8String)

  val sink = Sink.foreach(println)

  source
    .via(flow)
    .runWith(sink)
    .andThen {
      case _ =>
        actorSystem.terminate()
        Await.ready(actorSystem.whenTerminated, 1 minute)
    }
}
like image 7
Dmitry Kaltovich Avatar answered Oct 21 '22 05:10

Dmitry Kaltovich