I'm trying to parallelize one of my long running tasks. For some reason, it just wont finish and is hanging forever.
import multiprocessing as mp
class PartitionedResult(object):
index = 0
P = []
def __init__(self, index, P):
self.index = index
self.P = P
def longRunningTask(index, output):
P = []
for i in range (0, 1000):
print(i)
P.append(i)
print("I'm done!")
output.put(PartitionedResult(index, P))
return
def main():
output = mp.Queue()
processes = [mp.Process(target=longRunningTask, args=(x,output,)) for x in range(4)]
for p in processes:
p.start()
for p in processes:
p.join()
results = [output.get() for p in processes]
print("This never shows up")
if __name__ == '__main__':
main()
It is printing the numbers 0-999 for every of the 4 proccesses and it is even reaching the "I'm done!" line, but it just wont get to the results = [output.get() for p in processes]
line
If I reduce the range of the for
loop to let's say range(0,50)
, it suddenly works.
What's the problem here?
Edit: I'm using Python 3.4 on Windows 10, I tried it on 2 different computers and with the pycache deleted.
You are calling join()
on all the processes before you're get()
ing the results. When a Queue's buffer fills up, it can block when the data is flushed to the underlying pipe. If you join()
a process blocked that way from your consumer process you have a deadlock because the process can only exit after all data has been written.
Move the call to join to the end of your main()
then it should work:
def main():
output = mp.Queue()
processes = [mp.Process(target=longRunningTask, args=(x,output,)) for x in range(4)]
for p in processes:
p.start()
results = [output.get() for p in processes]
print("This never shows up")
for p in processes:
p.join()
If a process appears hung or deadlocked it can be useful to be able to kill it forcibly. Calling terminate() on a process object kills the child process. Basic example:
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()
p.start()
print 'DURING:', p, p.is_alive()
p.terminate()
print 'TERMINATED:', p, p.is_alive()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With