Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I run cleanup code in a Python multiprocessing Pool?

I have some Python code (on Windows) that uses the multiprocessing module to run a pool of worker processes. Each worker process needs to do some cleanup at the end of the map_async method.

Does anyone know how to do that?

like image 822
Dave Avatar asked May 20 '11 17:05

Dave


People also ask

How do you close a multiprocessing process in Python?

Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How does Python multiprocess work?

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

How does pool Apply_async work?

apply_async() The apply_async() function can be called directly to execute a target function in the process pool. The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.


2 Answers

Do you really want to run a cleanup function once for each worker process rather than once for every task created by the map_async call?

multiprocess.pool.Pool creates a pool of, say, 8 worker processes. map_async might submit 40 tasks to be distributed among the 8 workers. I can imagine why you might want to run cleanup code at the end of each task, but I'm having trouble imagining why you would want to run cleanup code just before each of the 8 worker processes is finalized.

Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker:

import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug

def cleanup():
    print('{n} CLEANUP'.format(n=mp.current_process().name))

# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    while 1:
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, **kwds))
        except Exception, e:
            result = (False, e)
        put((job, i, result))
    cleanup()

# Here we monkeypatch mpool.worker
mpool.worker=myworker

def foo(i):
    return i*i

def main():
    pool = mp.Pool(8)
    results = pool.map_async(foo, range(40)).get()
    print(results)

if __name__=='__main__':
    main()

yields:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP
like image 109
unutbu Avatar answered Dec 07 '22 18:12

unutbu


Your only real option here is to run cleanup at the end of the function you map_async to.

If this cleanup is honestly intended for at process death, you cannot use the concept of a pool. They are orthogonal. A pool does not dictate the process lifetime unless you use maxtasksperchild, which is new in Python 2.7. Even then, you do not gain the ability to run code at process death. However, maxtasksperchild might suit you, because any resources that the process opens will definitely go away when the process is terminated.

That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:

import functools
import multiprocessing

def cleanup(f):
    """Decorator for shared cleanup mechanism"""
    @functools.wraps(f)
    def wrapped(arg):
        result = f(arg)
        print("Cleaning up after f({0})".format(arg))
        return result
    return wrapped

@cleanup
def task1(arg):
    print("Hello from task1({0})".format(arg))
    return arg * 2

@cleanup
def task2(arg):
    print("Bonjour from task2({0})".format(arg))
    return arg ** 2

def main():
    p = multiprocessing.Pool(processes=3)
    print(p.map(task1, [1, 2, 3]))
    print(p.map(task2, [1, 2, 3]))

if __name__ == "__main__":
    main()

When you execute this (barring stdout being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:

Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]

Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]
like image 22
Jed Smith Avatar answered Dec 07 '22 18:12

Jed Smith