Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing - process hangs on join for large queue

I'm running python 2.7.3 and I noticed the following strange behavior. Consider this minimal example:

from multiprocessing import Process, Queue  def foo(qin, qout):     while True:         bar = qin.get()         if bar is None:             break         qout.put({'bar': bar})  if __name__ == '__main__':     import sys      qin = Queue()     qout = Queue()     worker = Process(target=foo,args=(qin,qout))     worker.start()      for i in range(100000):         print i         sys.stdout.flush()         qin.put(i**2)      qin.put(None)     worker.join() 

When I loop over 10,000 or more, my script hangs on worker.join(). It works fine when the loop only goes to 1,000.

Any ideas?

like image 799
user545424 Avatar asked Feb 08 '14 04:02

user545424


People also ask

Why is multiprocessing so slow Python?

This is due to the Python GIL being the bottleneck preventing threads from running completely concurrently. The best possible CPU utilisation can be achieved by making use of the ProcessPoolExecutor or Process modules which circumvents the GIL and make code run more concurrently.

What does join do in Python multiprocessing?

Python multiprocessing join The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated. The example calls the join on the newly created process.

Is multiprocessing queue thread safe?

This includes queues in the multiprocessing.Queues are thread and process safe. This means that processes may get() and put() items from and to the queue concurrently without fear of a race condition. You can learn more about to how to use queues with multiple processes in the tutorial: Multiprocessing Queue in Python.

Is multiprocessing faster in Python?

So, multiprocessing is faster when the program is CPU-bound. In cases where there is a lot of I/O in your program, threading may be more efficient because most of the time, your program is waiting for the I/O to complete. However, multiprocessing is generally more efficient because it runs concurrently.


2 Answers

The qout queue in the subprocess gets full. The data you put in it from foo() doesn't fit in the buffer of the OS's pipes used internally, so the subprocess blocks trying to fit more data. But the parent process is not reading this data: it is simply blocked too, waiting for the subprocess to finish. This is a typical deadlock.

like image 145
Armin Rigo Avatar answered Sep 24 '22 15:09

Armin Rigo


There must be a limit on the size of queues. Consider the following modification:

from multiprocessing import Process, Queue  def foo(qin,qout):     while True:         bar = qin.get()         if bar is None:             break         #qout.put({'bar':bar})  if __name__=='__main__':     import sys      qin=Queue()     qout=Queue()   ## POSITION 1     for i in range(100):         #qout=Queue()   ## POSITION 2         worker=Process(target=foo,args=(qin,))         worker.start()         for j in range(1000):             x=i*100+j             print x             sys.stdout.flush()             qin.put(x**2)          qin.put(None)         worker.join()      print 'Done!' 

This works as-is (with qout.put line commented out). If you try to save all 100000 results, then qout becomes too large: if I uncomment out the qout.put({'bar':bar}) in foo, and leave the definition of qout in POSITION 1, the code hangs. If, however, I move qout definition to POSITION 2, then the script finishes.

So in short, you have to be careful that neither qin nor qout becomes too large. (See also: Multiprocessing Queue maxsize limit is 32767)

like image 26
amd Avatar answered Sep 26 '22 15:09

amd