I have a large XML data file (>160M) to process, and it seems like SAX/expat/pulldom parsing is the way to go. I'd like to have a thread that sifts through the nodes and pushes nodes to be processed onto a queue, and then other worker threads pull the next available node off the queue and process it.
I have the following (it should have locks, I know - it will, later)
import sys, time
import xml.parsers.expat
import threading
q = []
def start_handler(name, attrs):
q.append(name)
def do_expat():
p = xml.parsers.expat.ParserCreate()
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
t = threading.Thread(group=None, target=do_expat)
t.start()
while True:
print(q)
time.sleep(1)
The problem is that the body of the while
block gets called only once, and then I can't even ctrl-C interrupt it. On smaller files, the output is as expected, but that seems to indicate that the handler only gets called when the document is fully parsed, which seems to defeat the purpose of a SAX parser.
I'm sure it's my own ignorance, but I don't see where I'm making the mistake.
PS: I also tried changing start_handler
thus:
def start_handler(name, attrs):
def app():
q.append(name)
u = threading.Thread(group=None, target=app)
u.start()
No love, though.
ParseFile
, as you've noticed, just "gulps down" everything -- no good for the incremental parsing you want to do! So, just feed the file to the parser a bit at a time, making sure to conditionally yield control to other threads as you go -- e.g.:
while True:
data = f.read(BUFSIZE)
if not data:
p.Parse('', True)
break
p.Parse(data, False)
time.sleep(0.0)
the time.sleep(0.0)
call is Python's way to say "yield to other threads if any are ready and waiting"; the Parse
method is documented here.
The second point is, forget locks for this usage! -- use Queue.Queue instead, it's intrinsically threadsafe and almost invariably the best and simplest way to coordinate multiple threads in Python. Just make a Queue
instance q
, q.put(name)
on it, and have worked threads block on q.get()
waiting to get some more work to do -- it's SO simple!
(There are several auxiliary strategies you can use to coordinate the termination of worker threads when there's no more work for them to do, but the simplest, absent special requirements, is to just make them daemon threads, so they will all terminate when the main thread does -- see the docs).
I'm not too sure about this problem. I'm guessing the call to ParseFile is blocking and only the parsing thread is being run because of the GIL. A way around this would be to use multiprocessing
instead. It's designed to work with queues, anyway.
You make a Process
and you can pass it a Queue
:
import sys, time
import xml.parsers.expat
import multiprocessing
import Queue
def do_expat(q):
p = xml.parsers.expat.ParserCreate()
def start_handler(name, attrs):
q.put(name)
p.StartElementHandler = start_handler
p.buffer_text = True
print("opening {0}".format(sys.argv[1]))
with open(sys.argv[1]) as f:
print("file is open")
p.ParseFile(f)
print("parsing complete")
if __name__ == '__main__':
q = multiprocessing.Queue()
process = multiprocessing.Process(target=do_expat, args=(q,))
process.start()
elements = []
while True:
while True:
try:
elements.append(q.get_nowait())
except Queue.Empty:
break
print elements
time.sleep(1)
I've included an elements list, just to replicate your original script. Your final solution will probably use get_nowait
and a Pool
or something similar.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With