Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing Early Termination

While my script is running, it is possible that an error may occur at some point. In that case, all processes should be properly terminated, an error message should be returned, and the script should exit.

The code I have now seems to not fulfill those requirements yet. When an error occurs, it is sent to report_error(), the script ends up hanging in Terminal, and Activity Monitor shows many Python processes still running.

Environment

  • Mac OS X 10.8.5
  • Python 3.3.3

What is the correct way to terminate all processes from any point in the script?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import sys
from multiprocessing import Pool


# Global variables.

input_files = [
    'test_data_0.csv',
    'test_data_1.csv'
]


def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    sys.exit(1)

    # What I really want is to report the error, properly terminate all processes,
    # and then exit the script.


def read_file(file):

    try:
        # Read file into list.
    except Exception as error:
        report_error(error)


def check_file(file):

    # Do some error checking on file.
    if error:
        report_error(error)


def job(file):

    # Executed on each item in input_files.

    check_file(file)
    read_file(file)


def main():

    # Sets up a process pool. Defaults to number of cores.
    # Each input gets passed to job and processed in a separate process.
    p = Pool()
    p.map(job, input_files)

    # Closing and joining a pool is important to ensure all resources are freed properly.
    p.close()
    p.join()


if __name__ == '__main__':
    main()
like image 324
fire_water Avatar asked Sep 11 '14 14:09

fire_water


1 Answers

First, using sys.exit() to kill the child worker process will actually break the pool, and make the map command hang forever. Currently multiprocessing doesn't recover properly from crashes in the worker processes while the worker is processing a job (there is a bug report with a patch that addresses this issue here, for what its worth).

There are a couple of ways you can do what you actually want to do. Since it seems like you don't care about the values being returned from the worker functions, the easiest is to use imap_unordered instead of map, raise an exception from the worker when there's a failure, and then simply iterate over the iterator returned by imap_unordered:

def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    raise error # Raise the exception

...

def main():
    p = Pool()
    try:
        list(p.imap_unordered(job, input_files))
    except Exception:
        print("a worker failed, aborting...")
        p.close()
        p.terminate()
    else:
        p.close()
        p.join()

if __name__ == '__main__':
    main()

With imap_unordered, results will be returned to the parent as soon as the child sends them. So, if a child sends an exception back to the parent, it will immediately get re-raised in the parent process. We catch that exception, print a message, and then terminate the pool.

like image 104
dano Avatar answered Nov 07 '22 10:11

dano