Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Strange blocking behavior with python multiprocessing queue put() and get()

I have written a class in python 2.7 (under linux) that uses multiple processes to manipulate a database asynchronously. I encountered a very strange blocking behaviour when using multiprocessing.Queue.put() and multiprocessing.Queue.get() which I can't explain.

Here is a simplified version of what I do:

from multiprocessing import Process, Queue

class MyDB(object):  

    def __init__(self):
        self.inqueue = Queue()

        p1 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
        p1.daemon = True
        started = False
        while not started:
            try:
                p1.start()
                started = True
            except:
                time.sleep(1)  

        #Sometimes I start a same second process but it makes no difference to my problem 
        p2 = Process(target = self._worker_process, kwargs={"inqueue": self.inqueue})
        #blahblah... (same as above)

    @staticmethod
    def _worker_process(inqueue):
        while True:
            #--------------this blocks depite data having arrived------------

            op = inqueue.get(block = True)
            #do something with specified operation

            #---------------problem area end--------------------
            print "if this text gets printed, the problem was solved"

    def delete_parallel(self, key, rawkey = False):
        someid = ...blahblah
        #--------------this section blocked when I was posting the question but for unknown reasons it's fine now
        self.inqueue.put({"optype": "delete", "kwargs": {"key":key, "rawkey":rawkey}, "callid": someid}, block = True) 
        #--------------problem area end----------------
        print "if you see this text, there was no blocking or block was released"

If I run the code above inside a test (in which I call delete_parallel on the MyDB object) then everything works, but if I run it in context of my entire application (importing other stuff, inclusive pygtk) strange things happen:

For some reason self.inqueue.get blocks and never releases despite self.inqueue having the data in its buffer. When I instead call self.inqueue.get(block = False, timeout = 1) then the call finishes by raising Queue.Empty, despite the queue containing data. qsize() returns 1 (suggests that data is there) while empty() returns True (suggests that there is no data).

Now clearly there must be something somewhere else in my application that renders self.inqueue unusable by causing acquisition of some internal semaphore. However I don't know what to look for. Eclipse dubugging becomes useless once a blocking semaphore is reached.

Edit 8 (cleaning up and summarizing my previous edits) Last time I had a similar problem, it turned out that pygtk was hijacking the global interpreter lock, but I solved it by calling gobject.threads_init() before I called anything else. Could this issue be related?

When I introduce a print "successful reception" after the get() method and execute my application in terminal, the same behaviour happens at first. When I then terminate by pressing CTRL+D I suddenly get the string "successful reception" inbetween messages. This looks to me like some other process/thread is terminated and releases the lock that blocks the process that is stuck at get().

Since the process that was stuck terminates later, I still see the message. What kind of process could externally mess with a Queue like that? self.inqueue is only accessed inside my class.

Right now it seems to come down to this queue which won't return anything despite the data being there:

defect queue

the get() method seems to get stuck when it attempts to receive the actual data from some internal pipe. The last line before my debugger hangs is:

res = self._recv()

which is inside of multiprocessing.queues.get() Tracking this internal python stuff further I find the assignments self._recv = self._reader.recv and self._reader, self._writer = Pipe(duplex=False).

Edit 9 I'm currently trying to hunt down the import that causes it. My application is quite complex with hundreds of classes and each class importing a lot of other classes, so it's a pretty painful process. I have found a first candidate class which Uses 3 different MyDB instances when I track all its imports (but doesn't access MyDB.inqueue at any time as far as I can tell). The strange thing is, it's basically just a wrapper and the wrapped class works just fine when imported on its own. This also means that it uses MyDB without freezing. As soon as I import the wrapper (which imports that class), I have the blocking issue.

I started rewriting the wrapper by gradually reusing the old code. I'm testing each time I introduce a couple of new lines until I will hopefully see which line will cause the problem to return.

like image 467
evolution Avatar asked Nov 10 '22 14:11

evolution


1 Answers

queue.Queue uses internal threads to maintain its state. If you are using GTK then it will break these threads. So you will need to call gobject.init_threads().

It should be noted that qsize() only returns an approximate size of the queue. The real size may be anywhere between 0 and the value returned by qsize().

like image 145
Dunes Avatar answered Nov 14 '22 21:11

Dunes