Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write an enumeratee to chunk an enumerator along different boundaries

So the Play2.0 Enumeratee page shows an example of using a the &> or through method to change an Enumerator[String] into an Enumerator[Int]:

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val ints: Enumerator[Int] = strings &> toInt

There is also an Enumeratee.grouped enumeratee to create an enumerator of chunks from individual elements. That seemed to work fine.

But what I see is that the usual input would be in the form of Array[Byte] (which is returned by Enumerator.fromFile and Enumerator.fromStream). With that in mind I would like to take those Array[Byte] inputs and turns them into an Enumerator[String], for instance where each string is a line (terminated by a '\n'). The boundaries for the lines and the Array[Byte] elements won't usually match. How do I write an enumerator that can convert the chunked arrays into chunked strings?

The purpose is to chunk those lines back to the browser as each Array[Byte] becomes available, and keep the leftover bytes that were not part of a complete line until the next input chunk comes along.

Ideally I'd love to have a method that given an iter: Iteratee[Array[Byte], T] and an Enumerator[Array[Byte]] will give me back an Enumerator[T], where my T elements were parsed by iter.

Additional Info: I had a bit of time to clean up my code and here is a specific example of what I'm trying to do. I have the following iteratees that detect the next line:

import play.api.libs.iteratee._
type AB = Array[Byte]

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = {
  def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match {
    case Input.EOF => Done(acc, Input.EOF)
    case Input.Empty => Cont(step(_, acc))
    case Input.El(arr) =>
      val (taking, rest) = arr.span(pred)
      if (rest.length > 0) Done(acc ++ taking, Input.El(rest)) 
      else Cont(step(_, acc ++ taking)) 
  }
  Cont(step(_, Array()))
}

val line = for {
  bytes <- takeWhile(b => !(b == '\n' || b == '\r'))
  _     <- takeWhile(b =>   b == '\n' || b == '\r')
} yield bytes

And what I'd like to do is something like that:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain")
like image 326
huynhjl Avatar asked Apr 27 '12 07:04

huynhjl


People also ask

What is enumerated example?

An enumerated type is a type whose legal values consist of a fixed set of constants. Common examples include compass directions, which take the values North, South, East and West and days of the week, which take the values Sunday, Monday, Tuesday, Wednesday, Thursday, Friday, and Saturday.

What is enumerated format?

In computer programming, an enumerated type (also called enumeration, enum, or factor in the R programming language, and a categorical variable in statistics) is a data type consisting of a set of named values called elements, members, enumeral, or enumerators of the type.

What are the enumerated data type explain in detail?

An enumeration is a data type that consists of a set of named values that represent integral constants, known as enumeration constants. An enumeration is also referred to as an enumerated type because you must list (enumerate) each of the values in creating a name for each of them.


2 Answers

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Does something similar to what you are doing. I fixed grouped to take care of the remaining input. The code basically looks like:

val upToNewLine = 
  Traversable.splitOnceAt[String,Char](_ != '\n')  &>>
  Iteratee.consume()

Enumeratee.grouped(upToNewLine)

Also I have to fix repeat in the same way

like image 184
Sadache Avatar answered Nov 09 '22 12:11

Sadache


Here is what I have after some hours of experimentation. I'm hoping that somebody can come up with a more elegant implementation, as I can barely follow mine.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
  def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
          Iteratee[AB, Iteratee[AB, A]] = {
      e match {
        case Input.EOF =>
          // if we have a leftover and it's a chunk, then output it
          leftover match {
            case Input.EOF | Input.Empty => Done(in, leftover)
            case Input.El(_) =>
              val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
                >>> Enumerator.eof |>> chunker)
              lastChunk.pureFlatFold(
                done = { (chunk, rest) =>
                  val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
                  nextIn.pureFlatFold(
                    done = (a, e2) => Done(nextIn, e2),
                    // nothing more will come
                    cont = k => Done(nextIn, Input.EOF),
                    error = (msg, e2) => Error(msg, e2))
                },
                // not enough content to get a chunk, so drop content
                cont = k => Done(in, Input.EOF),
                error = (msg, e2) => Error(msg, e2))
          }
        case Input.Empty => Cont(step(_, in, leftover))
        case Input.El(arr) =>
          // feed through chunker
          val iChunks = Iteratee.flatten(
            Enumerator.enumInput(leftover)
              >>> Enumerator(arr)
              >>> Enumerator.eof // to extract the leftover
              |>> repeat(chunker))
          iChunks.pureFlatFold(
            done = { (chunks, rest) =>
              // we have our chunks, feed them to the inner iteratee
              val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
              nextIn.pureFlatFold(
                done = (a, e2) => Done(nextIn, e2),
                // inner iteratee needs more data
                cont = k => Cont(step(_: Input[AB], nextIn,
                  // we have to ignore the EOF we fed to repeat
                  if (rest == Input.EOF) Input.Empty else rest)),
                error = (msg, e2) => Error(msg, e2))
            },
            // not enough content to get a chunk, continue
            cont = k => Cont(step(_: Input[AB], in, leftover)),
            error = (msg, e2) => Error(msg, e2))
      }
    }
    Cont(step(_, inner, Input.Empty))
  }
}

Here is the definition to my custom repeat:

// withhold the last chunk so that it may be concatenated with the next one
def repeat(chunker: Iteratee[AB, AB]) = {
  def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
        leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
    case Input.EOF => ch.pureFlatFold(
      done = (a, e) => Done(acc, leftover),
      cont = k => k(Input.EOF).pureFlatFold(
        done = (a, e) => Done(acc, Input.El(a)),
        cont = k => sys.error("divergent iter"),
        error = (msg, e) => Error(msg, e)),
      error = (msg, e) => Error(msg, e))
    case Input.Empty => Cont(loop(_, ch, acc, leftover))
    case Input.El(_) =>
      val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
          >>> Enumerator.enumInput(e) |>> ch)
      i.pureFlatFold(
        done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
        cont = k => Cont(loop(_, i, acc, Input.Empty)),
        error = (msg, e) => Error(msg, e))
  }
  Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
}

This works on a few samples including this one:

 val source = Enumerator(
   "bippy".getBytes,
   "foo\n\rbar\n\r\n\rbaz\nb".getBytes,
   "azam\ntoto\n\n".getBytes)
 Ok.stream(source 
   &> chunkBy(line) 
   &> Enumeratee.map(l => l ++ ".\n".getBytes)
 ).as("text/plain")

Which prints:

bippyfoo.
bar.
baz.
bazam.
toto.
like image 31
huynhjl Avatar answered Nov 09 '22 10:11

huynhjl