I am trying to exit a multiprocessing script when an error is thrown by the target function, but instead of quitting, the parent process just hangs.
This is the test script I use to replicate the problem:
#!/usr/bin/python3.5
import time, multiprocessing as mp
def myWait(wait, resultQueue):
startedAt = time.strftime("%H:%M:%S", time.localtime())
time.sleep(wait)
endedAt = time.strftime("%H:%M:%S", time.localtime())
name = mp.current_process().name
resultQueue.put((name, wait, startedAt, endedAt))
# queue initialisation
resultQueue = mp.Queue()
# process creation arg: (process number, sleep time, queue)
proc = [
mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
]
# starting processes
for p in proc:
p.start()
for p in proc:
p.join()
# print results
results = {}
for p in proc:
name, wait, startedAt, endedAt = resultQueue.get()
print('Process %s started at %s wait %s ended at %s' % (name, startedAt, wait, endedAt))
This works perfectly, I can see the parent script spawning two child processes in htop
but when I want to force the parent script to exit if an error is thrown in the myWait
target function the parent process just hangs and doesn't even spawn any child process. I have to ctrl-c
to kill it.
def myWait(wait, resultQueue):
try:
# do something wrong
except:
raise SystemExit
I have tried every way to exit the function (e.g. exit()
, sys.exit()
, os._exit()
...) to no avail.
A process can be killed by calling the Process. terminate() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.
You can shutdown the process pool via the Pool. close() or Pool. terminate() functions.
exit() in Python. You can call sys. exit() to exit a process.
Firstly, your code has a major issue: you're trying to join the processes before flushing the content of the queues, if any, which can result in a deadlock. See the section titled 'Joining processes that use queues' here: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
Secondly, the call to resultQueue.get()
will block until it receives some data, which never happens
if an exception is raised from the myWait
function and that no data has been pushed into the queue before then. So make it non-blocking and make it check for any data in a loop until it finally receives something or that something's wrong.
Here's a quick'n'dirty fix to give you the idea:
#!/usr/bin/python3.5
import multiprocessing as mp
import queue
import time
def myWait(wait, resultQueue):
raise Exception("error!")
# queue initialisation
resultQueue = mp.Queue()
# process creation arg: (process number, sleep time, queue)
proc = [
mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
]
# starting processes
for p in proc:
p.start()
# print results
results = {}
for p in proc:
while True:
if not p.is_alive():
break
try:
name, wait, startedAt, endedAt = resultQueue.get(block=False)
print('Process %s started at %s wait %s ended at %s'
% (name, startedAt, wait, endedAt))
break
except queue.Empty:
pass
for p in proc:
p.join()
The function myWait
will throw an exception but both processes will still join and the program will exit nicely.
You should use multiprocessing.Pool
to manage your processes for you. And then use Pool.imap_unordered
to iterate over the results in the order they are completed. As soon as you get the first exception, you can stop the pool and its child processes (this is automatically done when you exit the with Pool() as pool
block). eg
from multiprocessing import Pool
import time
def my_wait(args):
name, wait = args
if wait == 2:
raise ValueError("error!")
else:
startedAt = time.strftime("%H:%M:%S", time.localtime())
time.sleep(wait)
endedAt = time.strftime("%H:%M:%S", time.localtime())
return name, wait, startedAt, endedAt
if __name__ == "__main__":
try:
with Pool() as pool:
args = [["_One_", 2], ["_Two_", 3]]
for name, wait, startedAt, endedAt in pool.imap_unordered(my_wait, args):
print('Task %s started at %s wait %s ended at %s' % (name,
startedAt, wait, endedAt))
except ValueError as e:
print(e)
This method is not suitable for long-timed, low-workload tasks, as it will only run as many of the tasks in parallel as the number of child processes it is managing (but this is something you can set). It's also not great if you need to run different functions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With