I use Scalaz 7's iteratees in a number of projects, primarily for processing large-ish files. I'd like to start switching to Scalaz streams, which are designed to replace the iteratee package (which frankly is missing a lot of pieces and is kind of a pain to use).
Streams are based on machines (another variation on the iteratee idea), which have also been implemented in Haskell. I've used the Haskell machines library a bit, but the relationship between machines and streams isn't completely obvious (to me, at least), and the documentation for the streams library is still a little sparse.
This question is about a simple parsing task that I'd like to see implemented using streams instead of iteratees. I'll answer the question myself if nobody else beats me to it, but I'm sure I'm not the only one who's making (or at least considering) this transition, and since I need to work through this exercise anyway, I figured I might as well do it in public.
Supposed I've got a file containing sentences that have been tokenized and tagged with parts of speech:
no UH , , it PRP was VBD n't RB monday NNP . . the DT equity NN market NN was VBD illiquid JJ . .
There's one token per line, words and parts of speech are separated by a single space, and blank lines represent sentence boundaries. I want to parse this file and return a list of sentences, which we might as well represent as lists of tuples of strings:
List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)
As usual, we want to fail gracefully if we hit invalid input or file reading exceptions, we don't want to have to worry about closing resources manually, etc.
First for some general file reading stuff (that really ought to be part of the iteratee package, which currently doesn't provide anything remotely this high-level):
import java.io.{ BufferedReader, File, FileReader } import scalaz._, Scalaz._, effect.IO import iteratee.{ Iteratee => I, _ } type ErrorOr[A] = EitherT[IO, Throwable, A] def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B]( EitherT(action.catchLeft).map(I.sdone(_, I.emptyInput)) ) def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] { lazy val reader = r def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k => tryIO(IO(Option(reader.readLine))).flatMap { case None => s.pointI case Some(line) => k(I.elInput(line)) >>== apply[A] } ) } def enumFile(f: File) = new EnumeratorT[String, ErrorOr] { def apply[A] = (s: StepT[String, ErrorOr, A]) => tryIO( IO(new BufferedReader(new FileReader(f))) ).flatMap(reader => I.iterateeT[String, ErrorOr, A]( EitherT( enumBuffered(reader).apply(s).value.run.ensuring(IO(reader.close())) ) )) }
And then our sentence reader:
def sentence: IterateeT[String, ErrorOr, List[(String, String)]] = { import I._ def loop(acc: List[(String, String)])(s: Input[String]): IterateeT[String, ErrorOr, List[(String, String)]] = s( el = _.trim.split(" ") match { case Array(form, pos) => cont(loop(acc :+ (form, pos))) case Array("") => cont(done(acc, _)) case pieces => val throwable: Throwable = new Exception( "Invalid line: %s!".format(pieces.mkString(" ")) ) val error: ErrorOr[List[(String, String)]] = EitherT.left( throwable.point[IO] ) IterateeT.IterateeTMonadTrans[String].liftM(error) }, empty = cont(loop(acc)), eof = done(acc, eofInput) ) cont(loop(Nil)) }
And finally our parsing action:
val action = I.consume[List[(String, String)], ErrorOr, List] %= sentence.sequenceI &= enumFile(new File("example.txt"))
We can demonstrate that it works:
scala> action.run.run.unsafePerformIO().foreach(_.foreach(println)) List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))
And we're done.
More or less the same program implemented using Scalaz streams instead of iteratees.
A scalaz-stream solution:
import scalaz.std.vector._ import scalaz.syntax.traverse._ import scalaz.std.string._ val action = linesR("example.txt").map(_.trim). splitOn("").flatMap(_.traverseU { s => s.split(" ") match { case Array(form, pos) => emit(form -> pos) case _ => fail(new Exception(s"Invalid input $s")) }})
We can demonstrate that it works:
scala> action.collect.attempt.run.foreach(_.foreach(println)) Vector((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) Vector((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.))
And we're done.
The traverseU
function is a common Scalaz combinator. In this case it's being used to traverse, in the Process
monad, the sentence Vector
generated by splitOn
. It's equivalent to map
followed by sequence
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With