I am trying to split for loop i.e.
N = 1000000
for i in xrange(N):
#do something
using multiprocessing.Process and it works well for small values of N. Problem arise when I use bigger values of N. Something strange happens before or during p.join() and program doesn't respond. If I put print i, instead of q.put(i) in the definition of the function f everything works well.
I would appreciate any help. Here is the code.
from multiprocessing import Process, Queue
def f(q,nMin, nMax): # function for multiprocessing
for i in xrange(nMin,nMax):
q.put(i)
if __name__ == '__main__':
nEntries = 1000000
nCpu = 10
nEventsPerCpu = nEntries/nCpu
processes = []
q = Queue()
for i in xrange(nCpu):
processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) )
for p in processes:
p.start()
for p in processes:
p.join()
print q.qsize()
The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated.
Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
You are trying to grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue, so your main process is stalled waiting for that one to complete, and it never will.
If you pull data out of the queue before the join it will work fine.
One technique you could use is something like this:
while 1:
running = any(p.is_alive() for p in processes)
while not queue.empty():
process_queue_data()
if not running:
break
According to the documentation, the p.is_alive() should perform an implicit join, but it also appears to imply that the best practice might be to explicitly perform joins on all the threads after this.
Edit: Although that is pretty clear, it may not be all that performant. How you make it perform better will be highly task and machine specific (and in general, you shouldn't be creating that many processes at a time, anyway, unless some are going to be blocked on I/O).
Besides reducing the number of processes to the number of CPUs, some easy fixes to make it a bit faster (again, depending on circumstances) might look like this:
liveprocs = list(processes)
while liveprocs:
try:
while 1:
process_queue_data(q.get(False))
except Queue.Empty:
pass
time.sleep(0.5) # Give tasks a chance to put more data in
if not q.empty():
continue
liveprocs = [p for p in liveprocs if p.is_alive()]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With