Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fully streaming XML parser

I'm trying to consume the Exchange GetAttachment webservice using requests, lxml and base64io. This service returns a base64-encoded file in a SOAP XML HTTP response. The file content is contained in a single line in a single XML element. GetAttachment is just an example, but the problem is more general.

I would like to stream the decoded file contents directly to disk without storing the entire contents of the attachment in-memory at any point, since an attachment could be several 100 MB.

I have tried something like this:

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
with open('foo.txt', 'wb') as f:
    for action, elem in lxml.etree.iterparse(GzipFile(fileobj=r.raw)):
    if elem.tag == 't:Content':
        b64_encoder = Base64IO(BytesIO(elem.text))
        f.write(b64_encoder.read())

but lxml still stores a copy of the attachment as elem.text. Is there any way I can create a fully streaming XML parser that also streams the content of an element directly from the input stream?

like image 654
Erik Cederstrand Avatar asked Jan 28 '23 06:01

Erik Cederstrand


1 Answers

Don't use iterparse in this case. The iterparse() method can only issue element start and end events, so any text in an element is given to you when the closing XML tag has been found.

Instead, use a SAX parser interface. This is a general standard for XML parsing libraries, to pass on parsed data to a content handler. The ContentHandler.characters() callback is passed character data in chunks (assuming that the implementing XML library actually makes use of this possibility). This is a lower level API from the ElementTree API, and and the Python standard library already bundles the Expat parser to drive it.

So the flow then becomes:

  • wrap the incoming request stream in a GzipFile for easy decompression. Or, better still, set response.raw.decode_content = True and leave decompression to the requests library based on the content-encoding the server has set.
  • Pass the GzipFile instance or raw stream to the .parse() method of a parser created with xml.sax.make_parser(). The parser then proceeds to read from the stream in chunks. By using make_parser() you first can enable features such as namespace handling (which ensures your code doesn't break if Exchange decides to alter the short prefixes used for each namespace).
  • The content handler characters() method is called with chunks of XML data; check for the correct element start event, so you know when to expect base64 data. You can decode that base64 data in chunks of (a multiple of) 4 characters at a time, and write it to a file. I'd not use base64io here, just do your own chunking.

A simple content handler could be:

from xml.sax import handler
from base64 import b64decode

class AttachmentContentHandler(handler.ContentHandler):
    types_ns = 'http://schemas.microsoft.com/exchange/services/2006/types'

    def __init__(self, filename):
        self.filename = filename

    def startDocument(self):
        self._buffer = None
        self._file = None

    def startElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # we can expect base64 data next
            self._file = open(self.filename, 'wb')
            self._buffer = []

    def endElementNS(self, name, *args):
        if name == (self.types_ns, 'Content'):
            # all attachment data received, close the file
            try:
                if self._buffer:
                    raise ValueError("Incomplete Base64 data")
            finally:
                self._file.close()
                self._file = self._buffer = None

    def characters(self, data):
        if self._buffer is None:
            return
        self._buffer.append(data)
        self._decode_buffer()

    def _decode_buffer(self):
        remainder = ''
        for data in self._buffer:
            available = len(remainder) + len(data)
            overflow = available % 4
            if remainder:
                data = (remainder + data)
                remainder = ''
            if overflow:
                remainder, data = data[-overflow:], data[:-overflow]
            if data:
                self._file.write(b64decode(data))
        self._buffer = [remainder] if remainder else []

and you'd use it like this:

import requests
from xml.sax import make_parser, handler

parser = make_parser()
parser.setFeature(handler.feature_namespaces, True)
parser.setContentHandler(AttachmentContentHandler('foo.txt'))

r = requests.post('https://example.com/EWS/Exchange.asmx', data=..., stream=True)
r.raw.decode_content = True  # if content-encoding is used, decompress as we read
parser.parse(r.raw)

This will parse the input XML in chunks of up to 64KB (the default IncrementalParser buffer size), so attachment data is decoded in at most 48KB blocks of raw data.

I'd probably extend the content handler to take a target directory and then look for <t:Name> elements to extract the filename, then use that to extract the data to the correct filename for each attachment found. You'd also want to verify that you are actually dealing with a GetAttachmentResponse document, and handle error responses.

like image 124
Martijn Pieters Avatar answered Jan 29 '23 18:01

Martijn Pieters