So the Play2.0 Enumeratee page shows an example of using a the &>
or through
method to change an Enumerator[String]
into an Enumerator[Int]
:
val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val ints: Enumerator[Int] = strings &> toInt
There is also an Enumeratee.grouped
enumeratee to create an enumerator of chunks from individual elements. That seemed to work fine.
But what I see is that the usual input would be in the form of Array[Byte]
(which is returned by Enumerator.fromFile
and Enumerator.fromStream
). With that in mind I would like to take those Array[Byte]
inputs and turns them into an Enumerator[String]
, for instance where each string is a line (terminated by a '\n'
). The boundaries for the lines and the Array[Byte]
elements won't usually match. How do I write an enumerator that can convert the chunked arrays into chunked strings?
The purpose is to chunk those lines back to the browser as each Array[Byte]
becomes available, and keep the leftover bytes that were not part of a complete line until the next input chunk comes along.
Ideally I'd love to have a method that given an iter: Iteratee[Array[Byte], T]
and an Enumerator[Array[Byte]]
will give me back an Enumerator[T]
, where my T elements were parsed by iter
.
Additional Info: I had a bit of time to clean up my code and here is a specific example of what I'm trying to do. I have the following iteratees that detect the next line:
import play.api.libs.iteratee._
type AB = Array[Byte]
def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = {
def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match {
case Input.EOF => Done(acc, Input.EOF)
case Input.Empty => Cont(step(_, acc))
case Input.El(arr) =>
val (taking, rest) = arr.span(pred)
if (rest.length > 0) Done(acc ++ taking, Input.El(rest))
else Cont(step(_, acc ++ taking))
}
Cont(step(_, Array()))
}
val line = for {
bytes <- takeWhile(b => !(b == '\n' || b == '\r'))
_ <- takeWhile(b => b == '\n' || b == '\r')
} yield bytes
And what I'd like to do is something like that:
Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain")
An enumerated type is a type whose legal values consist of a fixed set of constants. Common examples include compass directions, which take the values North, South, East and West and days of the week, which take the values Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, and Saturday.
In computer programming, an enumerated type (also called enumeration, enum, or factor in the R programming language, and a categorical variable in statistics) is a data type consisting of a set of named values called elements, members, enumeral, or enumerators of the type.
An enumeration is a data type that consists of a set of named values that represent integral constants, known as enumeration constants. An enumeration is also referred to as an enumerated type because you must list (enumerate) each of the values in creating a name for each of them.
https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Does something similar to what you are doing. I fixed grouped to take care of the remaining input. The code basically looks like:
val upToNewLine =
Traversable.splitOnceAt[String,Char](_ != '\n') &>>
Iteratee.consume()
Enumeratee.grouped(upToNewLine)
Also I have to fix repeat in the same way
Here is what I have after some hours of experimentation. I'm hoping that somebody can come up with a more elegant implementation, as I can barely follow mine.
def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
Iteratee[AB, Iteratee[AB, A]] = {
e match {
case Input.EOF =>
// if we have a leftover and it's a chunk, then output it
leftover match {
case Input.EOF | Input.Empty => Done(in, leftover)
case Input.El(_) =>
val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
>>> Enumerator.eof |>> chunker)
lastChunk.pureFlatFold(
done = { (chunk, rest) =>
val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
nextIn.pureFlatFold(
done = (a, e2) => Done(nextIn, e2),
// nothing more will come
cont = k => Done(nextIn, Input.EOF),
error = (msg, e2) => Error(msg, e2))
},
// not enough content to get a chunk, so drop content
cont = k => Done(in, Input.EOF),
error = (msg, e2) => Error(msg, e2))
}
case Input.Empty => Cont(step(_, in, leftover))
case Input.El(arr) =>
// feed through chunker
val iChunks = Iteratee.flatten(
Enumerator.enumInput(leftover)
>>> Enumerator(arr)
>>> Enumerator.eof // to extract the leftover
|>> repeat(chunker))
iChunks.pureFlatFold(
done = { (chunks, rest) =>
// we have our chunks, feed them to the inner iteratee
val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
nextIn.pureFlatFold(
done = (a, e2) => Done(nextIn, e2),
// inner iteratee needs more data
cont = k => Cont(step(_: Input[AB], nextIn,
// we have to ignore the EOF we fed to repeat
if (rest == Input.EOF) Input.Empty else rest)),
error = (msg, e2) => Error(msg, e2))
},
// not enough content to get a chunk, continue
cont = k => Cont(step(_: Input[AB], in, leftover)),
error = (msg, e2) => Error(msg, e2))
}
}
Cont(step(_, inner, Input.Empty))
}
}
Here is the definition to my custom repeat
:
// withhold the last chunk so that it may be concatenated with the next one
def repeat(chunker: Iteratee[AB, AB]) = {
def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB],
leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
case Input.EOF => ch.pureFlatFold(
done = (a, e) => Done(acc, leftover),
cont = k => k(Input.EOF).pureFlatFold(
done = (a, e) => Done(acc, Input.El(a)),
cont = k => sys.error("divergent iter"),
error = (msg, e) => Error(msg, e)),
error = (msg, e) => Error(msg, e))
case Input.Empty => Cont(loop(_, ch, acc, leftover))
case Input.El(_) =>
val i = Iteratee.flatten(Enumerator.enumInput(leftover)
>>> Enumerator.enumInput(e) |>> ch)
i.pureFlatFold(
done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
cont = k => Cont(loop(_, i, acc, Input.Empty)),
error = (msg, e) => Error(msg, e))
}
Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
}
This works on a few samples including this one:
val source = Enumerator(
"bippy".getBytes,
"foo\n\rbar\n\r\n\rbaz\nb".getBytes,
"azam\ntoto\n\n".getBytes)
Ok.stream(source
&> chunkBy(line)
&> Enumeratee.map(l => l ++ ".\n".getBytes)
).as("text/plain")
Which prints:
bippyfoo.
bar.
baz.
bazam.
toto.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With