Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing queue: what to do when the receiving process quits?

Basically I have the following code:

import multiprocessing
import time

class MyProcess(multiprocessing.Process):

    def __init__(self, ):
        multiprocessing.Process.__init__(self)
        self.queue = multiprocessing.Queue()

    def run(self):
        print "Subprocess starting!"
        time.sleep(4)
        print "Subprocess exiting!"

    def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
        print "Added %d objects!" % count

        #self.queue.close()


if __name__ == "__main__":
    process = MyProcess()
    process.start()
    print "Waiting for a while"
    time.sleep(2)
    process.addToQueue()
    time.sleep(1)
    print "Child process state: %d" % process.is_alive()

When the main process finishes, it doesn't quit. Nothing happens, it just blocks. The only I found to quit was by killing it (not SIGTERM, SIGKILL).

If I use that commented line, it quits but issuing an IOError.

I looked at the code for the multiprocessing.queue and it uses a os.pipe() spawned in another thread (threading.Thread). What I suspect is that the thread blocks when writing to the pipe, and when the close() method is used, it raises the IOError.

So my question is: is there a cleaner way to handle this?

I mean, I have this scenario where a queue is constantly being written on. When the receiving process quits (cleanly or not), I should just close the queue and get an IOError on the sender process?

Edit: The output of the process

Waiting for a while
Subprocess starting!
Adding stuff to queue...
Subprocess exiting!
Added 1822174 objects!
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
Child process state: 0

This part only occurs when using the self.queue.close() that is commented:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
like image 670
felipou Avatar asked Dec 20 '22 21:12

felipou


1 Answers

I'm answering my own question since not everybody reads comments. After the hint from user mata in the comments, I tested the sample code in the question adding a call to time.sleep(0.01) inside the loop that adds object to the queue, so I could limit the number of objects that would be added to the queue:

def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
            time.sleep(0.01)
        print "Added %d objects!" % count

So, when the number of objects is low (less then 3800 in this example), the process quits normally. But when there are lots of objects, there seems to be some locking in the pipe between the processes.

But that raises another question for me: is this a bug? Should I report it? Or is this just normal expected behavior?

Big thanks to user mata for pointing this possibility!

like image 161
felipou Avatar answered Jan 28 '23 07:01

felipou