Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala: Iterate over CSV files in a functional way?

I have CSV files with comments that give column names, where the columns change throughout the file:

#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j

I want to provide a way to iterate over (only) the data rows of the file as Maps of column names to values (all Strings). So the above would become:

Map(c1 -> a, c2 -> b, c3 -> c)
Map(c1 -> d, c2 -> e, c3 -> f)
Map(c4 -> g, c5 -> h)
Map(c4 -> i, c5 -> j)

The files are very large, so reading everything into memory is not an option. Right now I have an Iterator class that keeps some ugly state between hasNext() and next(); I also provide accessors for the current line number and the actual last line and comment read (in case consumers care about field order). I'd like to try to do things in a more functional way.

My first idea was a for comprehension: I can iterate over the lines of the file, skipping the comment lines with a filter clause. I can yield a tuple containing the map, the line number, etc. The problem is I need to remember the last column names seen so I can create Maps from them. For loops understandably try to discourage keeping state, by only letting you set new vals. I learned from this question that I can update member variables in the yield block, but that's precisely when I don't want to update them in my case!

I could call a function in the iteration clause that updates state, but that seems dirty. So, what is the best way to do this in a functional style? Abuse for comprehensions? Hack scanLeft? Use a library? Bring out the parser combinator big guns? Or is a functional style just not a good match for this problem?

like image 435
Jay Hacker Avatar asked Sep 12 '11 15:09

Jay Hacker


2 Answers

State Monad FTW!

Actually, I suck at State monad. I had a hell of a time writing this up, and I have a strong feeling that it could be made much better. In particular, it seems to me that traverse is the way to go, but...

// Get Scalaz on the job
import scalaz._
import Scalaz._

// Some type aliases to make stuff clearer
type Input         = Stream[String]
type Header        = String
type InternalState = (Input, Header)
type Output        = Option[(Header, String)]
type MyState       = State[InternalState, Output]

// Detect headers
def isHeader(line: String) = line(0) == '#'

// From a state, produce an output
def makeLine: (InternalState => Output) = {
    case (head #:: _, _) if isHeader(head) => None
    case (head #:: _, header)              => Some(header -> head)
    case _                                 => None
}

// From a state, produce the next state
def nextLine: (InternalState => InternalState) = {
    case (head #:: tail, _) if isHeader(head) => tail -> head
    case (_ #:: tail, header)                 => tail -> header
    case _                                    => Stream.empty -> ""
}

// My state is defined by the functions producing the next state
// and the output
val myState: MyState = state(s => nextLine(s) -> makeLine(s))    

// Some input to test it. I'm trimming it to avoid problems on REPL
val input = """#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j""".lines.map(_.trim).toStream

// My State/Output Stream -- def to avoid keeping a reference to the head
def stateOutputStream = Stream.iterate(myState(input, "")){ 
        case (s, _) => myState(s) 
    } takeWhile { case ((stream, _), output) => stream.nonEmpty || output.nonEmpty }

// My Output Stream -- flatMap gets rid of the None from the headers
def outputStream = stateOutputStream flatMap { case (_, output) => output }

// Now just get the map
def outputToMap: (Header, String) => Map[String, String] = {
    case (header, line) =>
        val keys = header substring 1 split ","
        val values = line split ","
        keys zip values toMap
}

// And this is the result -- note that I'm still avoiding "val" so memory
// won't leak
def result = outputStream map outputToMap.tupled
like image 156
Daniel C. Sobral Avatar answered Oct 23 '22 13:10

Daniel C. Sobral


Here's one way you could do this with Iteratees. The stream is represented as a function from Iteratee to Iteratee, so it's never actually realized in memory. I'm using the State monad to track the last encountered header.

import scalaz._
import Scalaz._
import IterV._

type Header = List[String]
type MyState[A] = State[Header, A]
type Out = Map[String, String]

// Detect headers
def isHeader(line: String) = line(0) == '#'

type Enumeratee[A, B, C] =
  IterV[B, C] => Iteratee[MyState, A, IterV[B, C]]

// Enumerate a list. Just for demonstration.
def enumerateM[M[_]: Monad, E, A]:
  (List[E], Iteratee[M, E, A]) => Iteratee[M, E, A] = {
    case (Nil, i) => i
    case (x :: xs, Iteratee(m)) => Iteratee(for {
      v <- m
      o <- v match {
        case d@DoneM(_, _) => d.pure[M]
        case ContM(k) => enumerateM.apply(xs, k(El(x))).value
      }
    } yield o)
  }

def stateTrans[A]: Enumeratee[String, Map[String, String], A] =
  i => Iteratee(i.fold(
         done = (_, _) => DoneM(i, Empty.apply).pure[MyState],
         cont = k => ContM((x: Input[String]) => x match {
           case El(e) => Iteratee[MyState, String, IterV[Out, A]](for {
             h <- init
             o <- if (isHeader(e))
                    put(e substring 1 split "," toList) map (_ => Empty[Out])
                  else El((h zip (e split ",")).toMap).pure[MyState]
             v <- stateTrans(k(o)).value
           } yield v)
           case Empty() => stateTrans(k(Empty.apply))
           case EOF() => stateTrans(k(EOF.apply))
         }).pure[MyState]
       ))

Let's test this and take the head of the output stream:

scala> (enumerateM[MyState, String, IterV[Out, Option[Out]]].apply(
     | List("#c1,c2,c3","a,b,c","d,e,f"), stateTrans(head)).value ! List())
     | match { case DoneM(a, _) => a match { case Done(b, _) => b } }
res0: Option[Out] = Some(Map(c1 -> a, c2 -> b, c3 -> c))

This could be made much nicer by abstracting some of this stuff out to helper functions.

like image 34
Apocalisp Avatar answered Oct 23 '22 14:10

Apocalisp