I'm trying to make a file like object which is meant to be assigned to sys.stdout/sys.stderr during testing to provide deterministic output. It's not meant to be fast, just reliable. What I have so far almost works, but I need some help getting rid of the last few edge-case errors.
Here is my current implementation.
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from os import getpid
class MultiProcessFile(object):
"""
helper for testing multiprocessing
multiprocessing poses a problem for doctests, since the strategy
of replacing sys.stdout/stderr with file-like objects then
inspecting the results won't work: the child processes will
write to the objects, but the data will not be reflected
in the parent doctest-ing process.
The solution is to create file-like objects which will interact with
multiprocessing in a more desirable way.
All processes can write to this object, but only the creator can read.
This allows the testing system to see a unified picture of I/O.
"""
def __init__(self):
# per advice at:
# http://docs.python.org/library/multiprocessing.html#all-platforms
from multiprocessing import Queue
self.__master = getpid()
self.__queue = Queue()
self.__buffer = StringIO()
self.softspace = 0
def buffer(self):
if getpid() != self.__master:
return
from Queue import Empty
from collections import defaultdict
cache = defaultdict(str)
while True:
try:
pid, data = self.__queue.get_nowait()
except Empty:
break
cache[pid] += data
for pid in sorted(cache):
self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) )
def write(self, data):
self.__queue.put((getpid(), data))
def __iter__(self):
"getattr doesn't work for iter()"
self.buffer()
return self.__buffer
def getvalue(self):
self.buffer()
return self.__buffer.getvalue()
def flush(self):
"meaningless"
pass
... and a quick test script:
#!/usr/bin/python2.6
from multiprocessing import Process
from mpfile import MultiProcessFile
def printer(msg):
print msg
processes = []
for i in range(20):
processes.append( Process(target=printer, args=(i,), name='printer') )
print 'START'
import sys
buffer = MultiProcessFile()
sys.stdout = buffer
for p in processes:
p.start()
for p in processes:
p.join()
for i in range(20):
print i,
print
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
print
print 'DONE'
print
buffer.buffer()
print buffer.getvalue()
This works perfectly 95% of the time, but it has three edge-case problems. I have to run the test script in a fast while-loop to reproduce these.
In the very worst case (odds: one in 70 million), the output would look like this:
START
DONE
302 wrote: '19\n'
32731 wrote: '0 1 2 3 4 5 6 7 8 '
32732 wrote: '0\n'
32734 wrote: '1\n'
32735 wrote: '2\n'
32736 wrote: '3\n'
32737 wrote: '4\n'
32738 wrote: '5\n'
32743 wrote: '6\n'
32744 wrote: '7\n'
32745 wrote: '8\n'
32749 wrote: '9\n'
32751 wrote: '10\n'
32752 wrote: '11\n'
32753 wrote: '12\n'
32754 wrote: '13\n'
32756 wrote: '14\n'
32757 wrote: '15\n'
32759 wrote: '16\n'
32760 wrote: '17\n'
32761 wrote: '18\n'
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/threading.py", line 484, in run
File "/usr/lib/python2.6/multiprocessing/queues.py", line 233, in _feed
<type 'exceptions.TypeError'>: 'NoneType' object is not callable
In python2.7 the exception is slightly different:
Exception in thread QueueFeederThread (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
File "/usr/lib/python2.7/threading.py", line 505, in run
File "/usr/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
<type 'exceptions.IOError'>: [Errno 32] Broken pipe
How do I get rid of these edge cases?
The solution came in two parts. I've successfully run the test program 200 thousand times without any change in output.
The easy part was to use multiprocessing.current_process()._identity to sort the messages. This is not a part of the published API, but it is a unique, deterministic identifier of each process. This fixed the problem with PIDs wrapping around and giving a bad ordering of output.
The other part of the solution was to use multiprocessing.Manager().Queue() rather than the multiprocessing.Queue. This fixes problem #2 above because the manager lives in a separate Process, and so avoids some of the bad special cases when using a Queue from the owning process. #3 is fixed because the Queue is fully exhausted and the feeder thread dies naturally before python starts shutting down and closes stdin.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With