Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use Scala Iterator to break up large stream (from string) into chunks using a RegEx match, and then operate on those chunks?

I'm currently using a not-very-Scala-like approach to parse large Unix mailbox files. I'm still learning the language and would like to challenge myself to find a better way, however, I do not believe I have a solid grasp on just what can be done with an Iterator and how to effectively use it.

I'm currently using org.apache.james.mime4j, and I use the org.apache.james.mime4j.mboxiterator.MboxIterator to get a java.util.Iterator from a file, as so:

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))

From my understanding, the Scala Iterator is much more robust, and probably a lot more capable of handling something like this, especially because I won't always be able to fit the full file in memory.

I need to construct my own version of the MboxIterator. I dug through the source for MboxIterator and was able to find a good RegEx pattern to use to determine the beginning of individual email messages with, however, I'm drawing a blank from now on.

I created the RegEx like so:

 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);

What I want to do (based on what I know so far):

  • Build a FileInputStream from an MBOX file.
  • Use Iterator.continually(stream.read()) to read through the stream
  • Use .takeWhile() to continue to read until the end of the stream
  • Chunk the Stream using something like MESSAGE_START.matcher(someString).find(), or use it to find the indexes the separate the message
  • Read the chunks created, or read the bits in between the indexes created

I feel like I should be able to use map(), find(), filter() and collect() to accomplish this, but I'm getting thrown off by the fact that they only give me Ints to work with.

How would I accomplish this?

EDIT:

After doing some more thinking on the subject, I thought of another way to describe what I think I need to do:

  1. I need to keep reading from the stream until I get a string that matches my RegEx

  2. Maybe group the previously read bytes?

  3. Send it off to be processed somewhere

  4. Remove it from the scope somehow so it doesn't get grouped the next time I run into a match

  5. Continue to read the stream until I find the next match.

  6. Profit???

EDIT 2:

I think I'm getting closer. Using a method like this gets me an iterator of iterators. However, there are two issues: 1. Is this a waste of memory? Does this mean everything gets read into memory? 2. I still need to figure out a way to split by the match, but still include it in the iterator returned.

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  
like image 433
foxtrotuniform6969 Avatar asked Jul 14 '19 04:07

foxtrotuniform6969


1 Answers

If I understand correctly, you want to lazily chunk a large file delimited by a regex recognizable pattern.

You could try to return an Iterator for each request but the correct iterator management would not be trivial.

I'd be inclined to hide all file and iterator management from the client.

class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}

testing:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None

There is only one Iterator per open file and only the safe methods are invoked on it. The file text is realized (loaded) only on request and the client gets just what's requested, if available. Instead of all lines in one long String you could return each line as part of a collection, Seq[String], if that's more applicable.


UPDATE: This can be modified for easy iteration.

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}

Now you can .foreach(), .map(), .flatMap(), etc. But you can also do dangerous things like .toList which will load the entire file.

like image 146
jwvh Avatar answered Nov 15 '22 08:11

jwvh