Reading a CSV files using Akka Streams - based on this question. I have CSV read using Akka Streams. Now I need to perform process it line by line - but I also need to know what were the names of headers. Any options?
UPD. Clarifying a bit.
FileIO.fromPath(Paths.get("a.csv))
.via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
.runForeach(println /* header + current line for each line*/)
You can use prefixAndTail to take the first element - in this case the headers - and then combine it lazily with subsequent elements (the rows).
Here's an example with the headers and columns combined to form a Map[String, String]:
val flow: Flow[Seq[String], Map[String, String], NotUsed] = Flow[Seq[String]]
.prefixAndTail(1).flatMapConcat { case (headers, rows) =>
rows.map (row => headers.head.zip(row).toMap)
}
val test: Source[Seq[String], NotUsed] = Source(
List(Seq("col1", "col2"), Seq("a", "b"), Seq("1", "2")))
Await.result(test.via(flow).runForeach(println), 20.seconds)
// Map(col1 -> a, col2 -> b)
// Map(col1 -> 1, col2 -> 2)
Alpakka, the collection of Akka Streams connectors, provides CSV support:
Source
.single(ByteString("""header1,header2,header3
|1,2,3
|4,5,6""".stripMargin))
.via(CsvParsing.lineScanner())
.via(CsvToMap.toMap())
.map(_.mapValues(_.utf8String))
.runForeach(println)
// Map(header1 -> 1, header2 -> 2, header3 -> 3)
// Map(header1 -> 4, header2 -> 5, header3 -> 6)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With