Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process.join() and queue don't work with large numbers [duplicate]

I am trying to split for loop i.e.

N = 1000000
for i in xrange(N):
    #do something

using multiprocessing.Process and it works well for small values of N. Problem arise when I use bigger values of N. Something strange happens before or during p.join() and program doesn't respond. If I put print i, instead of q.put(i) in the definition of the function f everything works well.

I would appreciate any help. Here is the code.

from multiprocessing import Process, Queue

def f(q,nMin, nMax): # function for multiprocessing
    for i in xrange(nMin,nMax):
        q.put(i)

if __name__ == '__main__':

    nEntries = 1000000

    nCpu = 10
    nEventsPerCpu = nEntries/nCpu
    processes = []

    q = Queue()

    for i in xrange(nCpu):
        processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )

    for p in processes:
        p.start()

    for p in processes:
        p.join()

    print q.qsize()
like image 519
Puibo Avatar asked Jul 29 '15 18:07

Puibo


People also ask

What does process join () do?

The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated.

What does multiprocessing queue () do?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.

How multiprocessing works in Python?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.


1 Answers

You are trying to grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue, so your main process is stalled waiting for that one to complete, and it never will.

If you pull data out of the queue before the join it will work fine.

One technique you could use is something like this:

while 1:
    running = any(p.is_alive() for p in processes)
    while not queue.empty():
       process_queue_data()
    if not running:
        break

According to the documentation, the p.is_alive() should perform an implicit join, but it also appears to imply that the best practice might be to explicitly perform joins on all the threads after this.

Edit: Although that is pretty clear, it may not be all that performant. How you make it perform better will be highly task and machine specific (and in general, you shouldn't be creating that many processes at a time, anyway, unless some are going to be blocked on I/O).

Besides reducing the number of processes to the number of CPUs, some easy fixes to make it a bit faster (again, depending on circumstances) might look like this:

liveprocs = list(processes)
while liveprocs:
    try:
        while 1:
            process_queue_data(q.get(False))
    except Queue.Empty:
        pass

    time.sleep(0.5)    # Give tasks a chance to put more data in
    if not q.empty():
        continue
    liveprocs = [p for p in liveprocs if p.is_alive()]
like image 119
Patrick Maupin Avatar answered Nov 15 '22 21:11

Patrick Maupin