Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Exit a multiprocessing script

I am trying to exit a multiprocessing script when an error is thrown by the target function, but instead of quitting, the parent process just hangs.

This is the test script I use to replicate the problem:

#!/usr/bin/python3.5

import time, multiprocessing as mp

def myWait(wait, resultQueue):
    startedAt = time.strftime("%H:%M:%S", time.localtime())
    time.sleep(wait)
    endedAt = time.strftime("%H:%M:%S", time.localtime())
    name = mp.current_process().name
    resultQueue.put((name, wait, startedAt, endedAt))

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

for p in proc:
    p.join()

# print results
results = {}
for p in proc:
    name, wait, startedAt, endedAt = resultQueue.get()
    print('Process %s started at %s wait %s ended at %s' % (name, startedAt, wait, endedAt))

This works perfectly, I can see the parent script spawning two child processes in htop but when I want to force the parent script to exit if an error is thrown in the myWait target function the parent process just hangs and doesn't even spawn any child process. I have to ctrl-c to kill it.

def myWait(wait, resultQueue):
    try:
        # do something wrong
    except:
        raise SystemExit

I have tried every way to exit the function (e.g. exit(), sys.exit(), os._exit()...) to no avail.

like image 785
ripat Avatar asked Aug 31 '16 11:08

ripat


People also ask

How do you close a multiprocessing process?

A process can be killed by calling the Process. terminate() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.

How do I close multiprocessing in Python?

You can shutdown the process pool via the Pool. close() or Pool. terminate() functions.

How do you exit a process in Python?

exit() in Python. You can call sys. exit() to exit a process.


2 Answers

Firstly, your code has a major issue: you're trying to join the processes before flushing the content of the queues, if any, which can result in a deadlock. See the section titled 'Joining processes that use queues' here: https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming

Secondly, the call to resultQueue.get() will block until it receives some data, which never happens if an exception is raised from the myWait function and that no data has been pushed into the queue before then. So make it non-blocking and make it check for any data in a loop until it finally receives something or that something's wrong.

Here's a quick'n'dirty fix to give you the idea:

#!/usr/bin/python3.5

import multiprocessing as mp
import queue
import time

def myWait(wait, resultQueue):
    raise Exception("error!")

# queue initialisation
resultQueue = mp.Queue()

# process creation arg: (process number, sleep time, queue)
proc =  [
    mp.Process(target=myWait, name = ' _One_', args=(2, resultQueue,)),
    mp.Process(target=myWait, name = ' _Two_', args=(2, resultQueue,))
    ]

# starting processes
for p in proc:
    p.start()

# print results
results = {}
for p in proc:
    while True:
        if not p.is_alive():
            break

        try:
            name, wait, startedAt, endedAt = resultQueue.get(block=False)
            print('Process %s started at %s wait %s ended at %s'
                  % (name, startedAt, wait, endedAt))
            break
        except queue.Empty:
            pass

for p in proc:
    p.join()

The function myWait will throw an exception but both processes will still join and the program will exit nicely.

like image 188
ChristopherC Avatar answered Oct 03 '22 01:10

ChristopherC


You should use multiprocessing.Pool to manage your processes for you. And then use Pool.imap_unordered to iterate over the results in the order they are completed. As soon as you get the first exception, you can stop the pool and its child processes (this is automatically done when you exit the with Pool() as pool block). eg

from multiprocessing import Pool
import time

def my_wait(args):
    name, wait = args
    if wait == 2:
        raise ValueError("error!")
    else:
        startedAt = time.strftime("%H:%M:%S", time.localtime())
        time.sleep(wait)
        endedAt = time.strftime("%H:%M:%S", time.localtime())
        return name, wait, startedAt, endedAt

if __name__ == "__main__":
    try:
        with Pool() as pool:
            args = [["_One_", 2], ["_Two_", 3]]
            for name, wait, startedAt, endedAt in pool.imap_unordered(my_wait, args):     
                print('Task %s started at %s wait %s ended at %s' % (name,
                    startedAt, wait, endedAt))
    except ValueError as e:
        print(e)

This method is not suitable for long-timed, low-workload tasks, as it will only run as many of the tasks in parallel as the number of child processes it is managing (but this is something you can set). It's also not great if you need to run different functions.

like image 24
Dunes Avatar answered Oct 03 '22 01:10

Dunes