Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get a streaming Iterator[Node] from a large XML document?

I need to process XML documents that consist of a very large number of independent records, e.g.

<employees>
    <employee>
         <firstName>Kermit</firstName>
         <lastName>Frog</lastName>
         <role>Singer</role>
    </employee>
    <employee>
         <firstName>Oscar</firstName>
         <lastName>Grouch</lastName>
         <role>Garbageman</role>
    </employee>
    ...
</employees>

In some cases these are just big files, but in others they may come from a streaming source.

I can't just scala.xml.XmlLoader.load() it because I don't want to hold the whole document in memory (or wait for the input stream to close), when I only need to work with one record at a time. I know I can use XmlEventReader to stream the input as a sequence of XmlEvents. These are however much less convenient to work with than scala.xml.Node.

So I'd like to get a lazy Iterator[Node] out of this somehow, in order to operate on each individual record using the convenient Scala syntax, while keeping memory usage under control.

To do this myself, I could start with an XmlEventReader, build up a buffer of events between each matching start and end tag, and then construct a Node tree from that. But, is there an easier way that I've overlooked? Thanks for any insights!

like image 440
David Soergel Avatar asked Dec 15 '11 19:12

David Soergel


People also ask

How can I stream XML fragments from an XMLReader?

Write slightly different queries for XML documents loaded by XDocument.Load and XElement.Load. You can stream XML fragments from an XmlReader using a custom axis method in C# and Visual Basic. This is an approach that can work when loading the whole XML tree into memory is infeasible.

How does the iterator reader read XML documents?

While reading the XML document, the iterator reader returns an XML event object from it’s nextEvent () calls. This event provides information about what type of XML tag (element, text, comment etc) your have encountered. The event received is immutable so you can pass around your application to processs it safely. XMLEventReader reader = ...;

What is the difference between xstreamingelement and XMLWriter?

Using XStreamingElement for streaming the output XML is preferred to writing to an XmlWriter.

Is lxml with fast_ITER faster than iterparse?

Liza Daly's article on parsing large XML files may prove useful reading to you too. According to the article, lxml with fast_iter can be faster than cElementTree 's iterparse. (See Table 1). Show activity on this post. How about iter? Show activity on this post.


2 Answers

You can use the underlying parser used by XMLEventReader through ConstructingParser and process your employee nodes below the top level with a callback. You just have to be careful discarding the data as soon as processed:

import scala.xml._

def processSource[T](input: Source)(f: NodeSeq => T) {
  new scala.xml.parsing.ConstructingParser(input, false) {
    nextch // initialize per documentation
    document // trigger parsing by requesting document

    var depth = 0 // track depth

    override def elemStart(pos: Int, pre: String, label: String,
        attrs: MetaData, scope: NamespaceBinding) {
      super.elemStart(pos, pre, label, attrs, scope)
      depth += 1
    }
    override def elemEnd(pos: Int, pre: String, label: String) {
      depth -= 1
      super.elemEnd(pos, pre, label)
    }
    override def elem(pos: Int, pre: String, label: String, attrs: MetaData,
        pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = {
      val node = super.elem(pos, pre, label, attrs, pscope, nodes)
      depth match {
        case 1 => <dummy/> // dummy final roll up
        case 2 => f(node); NodeSeq.Empty // process and discard employee nodes
        case _ => node // roll up other nodes
      }
    }
  }
}

Then you use like this to process each node at the second level in constant memory (assuming the nodes at the second level aren't getting an arbitrary number of children):

processSource(src){ node =>
  // process here
  println(node)
}

The benefit compared to XMLEventReader is that you don't use two threads. Also you don't have to parse the node twice compared to your proposed solution. The drawback is that this relies on the inner workings of ConstructingParser.

like image 97
huynhjl Avatar answered Sep 20 '22 06:09

huynhjl


To get from huynhjl's generator solution to a TraversableOnce[Node], use this trick:

def generatorToTraversable[T](func: (T => Unit) => Unit) = 
  new Traversable[T] {
    def foreach[X](f: T => X) {
      func(f(_))
    }
  }

def firstLevelNodes(input: Source): TraversableOnce[Node] =
  generatorToTraversable(processSource(input))

The result of generatorToTraversable is not traversable more than once (even though a new ConstructingParser is instantiated on each foreach call) because the input stream is a Source, which is an Iterator. We can't override Traversable.isTraversableAgain, though, because it's final.

Really we'd like to enforce this by just returning an Iterator. However, both Traversable.toIterator and Traversable.view.toIterator make an intermediate Stream, which will cache all the entries (defeating the whole purpose of this exercise). Oh well; I'll just let the stream throw an exception if it's accessed twice.

Also note the whole thing isn't thread safe.

This code runs great, and I believe the overall solution to be both lazy and not caching (hence constant memory), though I haven't tried it on a large input yet.

like image 27
David Soergel Avatar answered Sep 24 '22 06:09

David Soergel