I have written a class in python 2.7 (under linux) that uses multiple processes to manipulate a database asynchronously. I encountered a very strange blocking behaviour when using multiprocessing.Queue.put()
and multiprocessing.Queue.get()
which I can't explain.
Here is a simplified version of what I do:
from multiprocessing import Process, Queue
class MyDB(object):
def __init__(self):
self.inqueue = Queue()
p1 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
p1.daemon = True
started = False
while not started:
try:
p1.start()
started = True
except:
time.sleep(1)
#Sometimes I start a same second process but it makes no difference to my problem
p2 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
#blahblah... (same as above)
@staticmethod
def _worker_process(inqueue):
while True:
#--------------this blocks depite data having arrived------------
op = inqueue.get(block = True)
#do something with specified operation
#---------------problem area end--------------------
print "if this text gets printed, the problem was solved"
def delete_parallel(self, key, rawkey = False):
someid = ...blahblah
#--------------this section blocked when I was posting the question but for unknown reasons it's fine now
self.inqueue.put({"optype": "delete", "kwargs": {"key":key, "rawkey":rawkey}, "callid": someid}, block = True)
#--------------problem area end----------------
print "if you see this text, there was no blocking or block was released"
If I run the code above inside a test (in which I call delete_parallel
on the MyDB
object) then everything works, but if I run it in context of my entire application (importing other stuff, inclusive pygtk) strange things happen:
For some reason self.inqueue.get
blocks and never releases despite self.inqueue
having the data in its buffer. When I instead call self.inqueue.get(block = False, timeout = 1)
then the call finishes by raising Queue.Empty, despite the queue containing data. qsize()
returns 1 (suggests that data is there) while empty()
returns True (suggests that there is no data).
Now clearly there must be something somewhere else in my application that renders self.inqueue
unusable by causing acquisition of some internal semaphore. However I don't know what to look for. Eclipse dubugging becomes useless once a blocking semaphore is reached.
Edit 8 (cleaning up and summarizing my previous edits) Last time I had a similar problem, it turned out that pygtk was hijacking the global interpreter lock, but I solved it by calling gobject.threads_init()
before I called anything else. Could this issue be related?
When I introduce a print "successful reception"
after the get()
method and execute my application in terminal, the same behaviour happens at first. When I then terminate by pressing CTRL+D I suddenly get the string "successful reception" inbetween messages. This looks to me like some other process/thread is terminated and releases the lock that blocks the process that is stuck at get()
.
Since the process that was stuck terminates later, I still see the message. What kind of process could externally mess with a Queue like that? self.inqueue
is only accessed inside my class.
Right now it seems to come down to this queue which won't return anything despite the data being there:
the get()
method seems to get stuck when it attempts to receive the actual data from some internal pipe. The last line before my debugger hangs is:
res = self._recv()
which is inside of multiprocessing.queues.get()
Tracking this internal python stuff further I find the assignments
self._recv = self._reader.recv
and self._reader, self._writer = Pipe(duplex=False)
.
Edit 9
I'm currently trying to hunt down the import that causes it. My application is quite complex with hundreds of classes and each class importing a lot of other classes, so it's a pretty painful process. I have found a first candidate class which Uses 3 different MyDB
instances when I track all its imports (but doesn't access MyDB.inqueue
at any time as far as I can tell). The strange thing is, it's basically just a wrapper and the wrapped class works just fine when imported on its own. This also means that it uses MyDB
without freezing. As soon as I import the wrapper (which imports that class), I have the blocking issue.
I started rewriting the wrapper by gradually reusing the old code. I'm testing each time I introduce a couple of new lines until I will hopefully see which line will cause the problem to return.
queue.Queue
uses internal threads to maintain its state. If you are using GTK then it will break these threads. So you will need to call gobject.init_threads()
.
It should be noted that qsize()
only returns an approximate size of the queue. The real size may be anywhere between 0 and the value returned by qsize()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With