I'm trying to write an enumerator for reading files line by line from a java.io.BufferedReader
using Scalaz 7's iteratee library, which currently only provides an (extremely slow) enumerator for java.io.Reader
.
The problems I'm running into are related to the fact that all of the other iteratee libraries I've used (e.g. Play 2.0's and John Millikin's enumerator
for Haskell) have had an error state as one of their Step
type's constructors, and Scalaz 7 doesn't.
Here's what I currently have. First for some imports and IO
wrappers:
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
And an type alias to clean things up a bit:
type ErrorOr[A] = Either[Throwable, A]
And now a tryIO
helper, modeled (loosely, and probably wrongly) on the one in enumerator
:
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
action.catchLeft.map(
r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
)
)
An enumerator for the BufferedReader
itself:
def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
lazy val reader = r
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case Right(None) => s.pointI
case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
case Left(e) => k(I.elInput(Left(e)))
}
)
}
And finally an enumerator that's responsible for opening and closing the reader:
def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
tryIO(openFile(f)) flatMap {
case Right(reader) => I.iterateeT(
enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
)
case Left(e) => k(I.elInput(Left(e)))
}
)
}
Now suppose for example that I want to collect all the lines in a file that contain at least twenty-five '0'
characters into a list. I can write:
val action: IO[ErrorOr[List[String]]] = (
I.consume[ErrorOr[String], IO, List] %=
I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
enumFile(new File("big.txt"))
).run.map(_.sequence)
In many ways this seems to work beautifully: I can kick the action off with unsafePerformIO
and it will chunk through tens of millions of lines and gigabytes of data in a couple of minutes, in constant memory and without blowing the stack, and then close the reader when it's done. If I give it the name of a file that doesn't exist, it will dutifully give me back the exception wrapped in a Left
, and enumBuffered
at least seems to behave appropriately if it hits an exception while reading.
I have some concerns about my implementation, though—particularly of tryIO
. For example, suppose I try to compose a few iteratees:
val it = for {
_ <- tryIO[Unit, Unit](IO(println("a")))
_ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
r <- tryIO[Unit, Unit](IO(println("b")))
} yield r
If I run this, I get the following:
scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())
If I try the same thing with enumerator
in GHCi, the result is more like what I'd expect:
...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !
I just don't see a way to get this behavior without an error state in the iteratee library itself.
I don't claim to be any kind of expert on iteratees, but I have used the various Haskell implementations in a few projects, feel like I more or less understand the fundamental concepts, and had coffee with Oleg once. I'm at a loss here, though. Is this a reasonable way to handle exceptions in the absence of an error state? Is there a way to implement tryIO
that would behave more like the enumerator
version? Is there some kind of time bomb waiting for me in the fact that my implementation behaves differently?
EDIT here is the real solution. I left in the original post because I think its worthwhile seeing the pattern. What works for Klesli works for IterateeT
import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }
object IterateeIOExample {
type ErrorOr[+A] = EitherT[IO, Throwable, A]
def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())
def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
EitherT.fromEither(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
}
def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
lazy val reader = r
def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
tryIO(readLine(reader)) flatMap {
case None => s.pointI
case Some(line) => k(I.elInput(line)) >>== apply[A]
})
}
def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
def apply[A] = (s: StepT[String, ErrorOr, A]) =>
tryIO(openFile(f)).flatMap(reader => I.iterateeT[String, ErrorOr, A](
EitherT(
enumBuffered(reader).apply(s).value.run.ensuring(closeReader(reader)))))
}
def main(args: Array[String]) {
val action = (
I.consume[String, ErrorOr, List] %=
I.filter(a => a.count(_ == '0') >= 25) &=
enumFile(new File(args(0)))).run.run
println(action.unsafePerformIO().map(_.size))
}
}
===== Original Post =====
I feel like you need an EitherT in the mix. Without EitherT you are just ending up with a 3 Lefts or Rights. With EitherT it would propergate the left.
I think what you really want is
type ErrorOr[+A] = EitherT[IO, Throwable, A]
I.iterateeT[A, ErrorOr, B]
The following code mimics how you are currently composing things. Because IterateeT has no concept of left and right, when you compose it, you just end up with a bunch of IO/Id's.
scala> Kleisli((a:Int) => 4.right[String].point[Id])
res11: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@73e771ca
scala> Kleisli((a:Int) => "aa".left[Int].point[Id])
res12: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@be41b41
scala> for { a <- res11; b <- res12 } yield (a,b)
res15: scalaz.Kleisli[scalaz.Scalaz.Id,Int,(scalaz.\/[String,Int], scalaz.\/[String,Int])] = scalaz.KleisliFunctions$$anon$18@42fd1445
scala> res15.run(1)
res16: (scalaz.\/[String,Int], scalaz.\/[String,Int]) = (\/-(4),-\/(aa))
In the following code, instead of using Id, we use an EitherT. Since EitherT has the same bind behaviour as Either, we end up with what we want.
scala> type ErrorOr[+A] = EitherT[Id, String, A]
defined type alias ErrorOr
scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT(4.right[String].point[Id]))
res22: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@58b547a0
scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT("aa".left[Int].point[Id]))
res24: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@342f2ceb
scala> for { a <- res22; b <- res24 } yield 2
res25: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@204eab31
scala> res25.run(2).run
res26: scalaz.Scalaz.Id[scalaz.\/[String,Int]] = -\/(aa)
You can replace Keisli with IterateeT and Id with IO to get what you need.
The way pipes
does it is to type-class composition using the Channel
type class:
class Channel p where
{-| 'idT' acts like a \'T\'ransparent proxy, passing all requests further
upstream, and passing all responses further downstream. -}
idT :: (Monad m) => a' -> p a' a a' a m r
{-| Compose two proxies, satisfying all requests from downstream with
responses from upstream. -}
(>->) :: (Monad m)
=> (b' -> p a' a b' b m r)
-> (c' -> p b' b c' c m r)
-> (c' -> p a' a c' c m r)
p1 >-> p2 = p2 <-< p1
... and derived a lifted composition over EitherT
from the base composition. This is a special case of the the principle of proxy transformers, introduced in pipes-2.4
, that allows lifting composition over arbitrary extensions.
This lifting requires defining an EitherT
specialized to the shape of the Proxy
type in Control.Proxy.Trans.Either
:
newtype EitherP e p a' a b' b (m :: * -> *) r
= EitherP { runEitherP :: p a' a b' b m (Either e r) }
This specialization to the Proxy
shape is necessary in order to be able to define a well-typed instance of the Channel
class. Scala might be more flexible in this regard than Haskell.
Then I just redefine the Monad
instance (and other instances) along with all the ordinary EitherT
operations for this specialized type:
throw :: (Monad (p a' a b' b m)) => e -> EitherP e p a' a b' b m r
throw = EitherP . return . Left
catch
:: (Monad (p a' a b' b m))
=> EitherP e p a' a b' b m r -- ^ Original computation
-> (e -> EitherP f p a' a b' b m r) -- ^ Handler
-> EitherP f p a' a b' b m r -- ^ Handled computation
catch m f = EitherP $ do
e <- runEitherP m
runEitherP $ case e of
Left l -> f l
Right r -> right r
With this in hand I can then define the following lifted composition instance:
-- Given that 'p' is composable, so is 'EitherP e p'
instance (Channel p) => Channel (EitherP e p) where
idT = EitherP . idT
p1 >-> p2 = (EitherP .) $ runEitherP . p1 >-> runEitherP . p2
To understand what is going on there, just follow the types:
p1 :: b' -> EitherP e p a' a b' b m r
p2 :: c' -> EitherP e p b' b c' c m r
runEitherP . p1 :: b' -> p a' a b' b m (Either e r)
runEitherP . p2 :: c' -> p b' b c' c m (Either e r)
-- Use the base composition for 'p'
runEitherP . p1 >-> runEitherP . p2
:: c' -> p a' a c' c m (Either e r)
-- Rewrap in EitherP
(EitherP . ) $ runEitherP . p1 >-> runEitherP . p2
:: c' -> EitherP e p a' a c' c m r
This lets you throw and catch errors within a particular stage without interrupting other stages. Here's an example I've copied and pasted from my pipes-2.4
announcement post:
import Control.Monad (forever)
import Control.Monad.Trans (lift)
import Control.Proxy
import Control.Proxy.Trans.Either as E
import Safe (readMay)
promptInts :: () -> EitherP String Proxy C () () Int IO r
promptInts () = recover $ forever $ do
str <- lift getLine
case readMay str of
Nothing -> E.throw "Could not parse an integer"
Just n -> liftP $ respond n
recover p =
p `E.catch` (\str -> lift (putStrLn str) >> recover p)
main = runProxy $ runEitherK $ mapP printD <-< promptInts
Here's the result:
>>> main
1<Enter>
1
Test<Enter>
Could not parse an integer
Apple<Enter>
Could not parse an integer
5<Enter>
5
The answer to the iteratee approach is similar. You must take your existing way of composing iteratees and lift it over EitherT
. Whether or not you use type-classes or just define a new composition operator is up to you.
Some other useful links:
pipes-2.4
announcement postControl.Proxy.Class
, Control.Proxy.Trans
, and Control.Proxy.Trans.Either
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With