In the last month, we've had a persistent problem with the Python 2.6.x multiprocessing package when we've tried to use it to share a queue among several different (linux) computers. I've posed this question directly to Jesse Noller as well since we haven't yet found anything that elucidates the issue on StackOverflow, Python docs, source code or elsewhere online.
Our team of engineers hasn't been able to solve this one, and we've posed the question to quite a few people in python user groups to no avail. I was hoping someone could shed some insight, since I feel like we're doing something incorrect but are too close to the problem to see it for what it is.
Here's the symptom:
Traceback (most recent call last):
File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
return queue, queue.get(block=False)
File "<string>", line 2, in get
File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
(I'm showing where our code calls queue.get() on a shared queue object, hosted by a manager that extends SyncManger).
What's peculiar about the issue is that if we connect to this shared queue on a single machine (let's call this machine A
), even from lots of concurrent processes, we never seem to run into an issue. It's only when we connect to the queue (again, using a class that extends multiprocessing SyncManager and currently adds no additional functionality) from other machines (let's call these machines B and C
) and run a high volume of items into and out of the queue at the same time that we experience a problem.
It is as though python's multiprocessing package handles local connections (even though they are still using the same manager.connect() connection method) in a manner that works from machine A
but when remote connections are made simultaneously from at least one of machines B or C
we get a Broken pipe error.
In all the reading my team has done, we thought the problem was related to locking. We thought maybe we shouldn't use Queue.Queue
, but instead multiprocessing.Queue
, but we switched and the problem persisted (we also noticed that SyncManager's own shared Queue is an instance of Queue.Queue).
We are pulling our hair out about how to even debug the issue, since it's hard to reproduce but does happen fairly frequently (many times per day if we are inserting and .get()ing lots of items from the queue).
The method we created get_from_queue
attempts to retry acquiring the item from a queue ~10 times with randomized sleep intervals, but it seems like if it fails once, it will fail all ten times (which lead me to believe that .register() and .connect()ing to a manager perhaps doesn't give another socket connection to the server, but I couldn't confirm this either by reading the docs or looking at the Python internal source code).
Can anyone provide any insight into where we might look or how we might track what's actually happening?
How can we start a new connection in the event of a broken pipe using multiprocessing.BaseManager
or multiprocessing.SyncManager
?
How can we prevent the broken pipe in the first place?
Python provides a number of process-safe queues, such as the multiprocessing. Queue class.
Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing. connection. Connection class.
Simply use q = ClearableQueue() in all places where you used q = Queue() , and call q. clear() when you'd like.
FYI In case anyone else runs by this same error, after extensive consulting with Ask Solem and Jesse Noller of Python's core dev team, it looks like this is actually a bug in current python 2.6.x (and possibly 2.7+ and possibly 3.x). They are looking at possible solutions and a fix will probably be included in a future version of Python.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With