I'm currently using a not-very-Scala-like approach to parse large Unix mailbox files. I'm still learning the language and would like to challenge myself to find a better way, however, I do not believe I have a solid grasp on just what can be done with an Iterator
and how to effectively use it.
I'm currently using org.apache.james.mime4j
, and I use the org.apache.james.mime4j.mboxiterator.MboxIterator
to get a java.util.Iterator
from a file, as so:
// registers an implementation of a ContentHandler that
// allows me to construct an object representing an email
// using callbacks
val handler: ContentHandler = new MyHandler();
// creates a parser that parses a SINGLE email from a given InputStream
val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
// register my handler
parser.setContentHandler(handler);
// Get a java.util.Iterator
val iterator = MboxIterator.fromFile(fileName).build();
// For each email, process it using above Handler
iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))
From my understanding, the Scala Iterator
is much more robust, and probably a lot more capable of handling something like this, especially because I won't always be able to fit the full file in memory.
I need to construct my own version of the MboxIterator
. I dug through the source for MboxIterator
and was able to find a good RegEx pattern to use to determine the beginning of individual email messages with, however, I'm drawing a blank from now on.
I created the RegEx like so:
val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);
What I want to do (based on what I know so far):
FileInputStream
from an MBOX file.Iterator.continually(stream.read())
to read through the stream.takeWhile()
to continue to read until the end of the streamMESSAGE_START.matcher(someString).find()
, or use it to find the indexes the separate the messageI feel like I should be able to use map()
, find()
, filter()
and collect()
to accomplish this, but I'm getting thrown off by the fact that they only give me Int
s to work with.
How would I accomplish this?
EDIT:
After doing some more thinking on the subject, I thought of another way to describe what I think I need to do:
I need to keep reading from the stream until I get a string that matches my RegEx
Maybe group
the previously read bytes?
Send it off to be processed somewhere
Remove it from the scope somehow so it doesn't get grouped the next time I run into a match
Continue to read the stream until I find the next match.
Profit???
EDIT 2:
I think I'm getting closer. Using a method like this gets me an iterator of iterators. However, there are two issues: 1. Is this a waste of memory? Does this mean everything gets read into memory? 2. I still need to figure out a way to split by the match
, but still include it in the iterator returned.
def split[T](iter: Iterator[T])(breakOn: T => Boolean):
Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
def hasNext = iter.hasNext
def next = {
val cur = iter.takeWhile(!breakOn(_))
iter.dropWhile(breakOn)
cur
}
}.withFilter(l => l.nonEmpty)
If I understand correctly, you want to lazily chunk a large file delimited by a regex recognizable pattern.
You could try to return an Iterator
for each request but the correct iterator management would not be trivial.
I'd be inclined to hide all file and iterator management from the client.
class MBox(filePath :String) {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :Option[String] =
if (itr.hasNext) {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
Some(sb.mkString)
} else {
file.close()
None
}
}
testing:
val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul 8 12:08:34 2011
//some text AAA
//some text BBB
//)
mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun 8 12:18:34 2012
//small text
//)
mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan 8 11:18:14 2013
//some text CCC
//some text DDD
//)
mbox.next() //res3: Option[String] = None
There is only one Iterator
per open file and only the safe methods are invoked on it. The file text is realized (loaded) only on request and the client gets just what's requested, if available. Instead of all lines in one long String
you could return each line as part of a collection, Seq[String]
, if that's more applicable.
UPDATE: This can be modified for easy iteration.
class MBox(filePath :String) extends Iterator[String] {
private val file = io.Source.fromFile(filePath)
private val itr = file.getLines().buffered
private val header = "From .+ \\d{4}".r //adjust to taste
def next() :String = {
val sb = new StringBuilder()
sb.append(itr.next() + "\n")
while (itr.hasNext && !header.matches(itr.head))
sb.append(itr.next() + "\n")
sb.mkString
}
def hasNext: Boolean =
if (itr.hasNext) true else {file.close(); false}
}
Now you can .foreach()
, .map()
, .flatMap()
, etc. But you can also do dangerous things like .toList
which will load the entire file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With