I have some Python code (on Windows) that uses the multiprocessing module to run a pool of worker processes. Each worker process needs to do some cleanup at the end of the map_async
method.
Does anyone know how to do that?
Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
apply_async() The apply_async() function can be called directly to execute a target function in the process pool. The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.
Do you really want to run a cleanup function once for each worker process rather than once for every task created by the map_async
call?
multiprocess.pool.Pool
creates a pool of, say, 8 worker processes. map_async
might submit 40 tasks to be distributed among the 8 workers.
I can imagine why you might want to run cleanup code at the end of each task, but I'm having trouble imagining why you would want to run cleanup code just before each of the 8 worker processes is finalized.
Nevertheless, if that is what you want to do, you could do it by monkeypatching multiprocessing.pool.worker
:
import multiprocessing as mp
import multiprocessing.pool as mpool
from multiprocessing.util import debug
def cleanup():
print('{n} CLEANUP'.format(n=mp.current_process().name))
# This code comes from /usr/lib/python2.6/multiprocessing/pool.py,
# except for the single line at the end which calls cleanup().
def myworker(inqueue, outqueue, initializer=None, initargs=()):
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(*initargs)
while 1:
try:
task = get()
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception, e:
result = (False, e)
put((job, i, result))
cleanup()
# Here we monkeypatch mpool.worker
mpool.worker=myworker
def foo(i):
return i*i
def main():
pool = mp.Pool(8)
results = pool.map_async(foo, range(40)).get()
print(results)
if __name__=='__main__':
main()
yields:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
PoolWorker-8 CLEANUP
PoolWorker-3 CLEANUP
PoolWorker-7 CLEANUP
PoolWorker-1 CLEANUP
PoolWorker-6 CLEANUP
PoolWorker-2 CLEANUP
PoolWorker-4 CLEANUP
PoolWorker-5 CLEANUP
Your only real option here is to run cleanup at the end of the function you map_async
to.
If this cleanup is honestly intended for at process death, you cannot use the concept of a pool. They are orthogonal. A pool does not dictate the process lifetime unless you use maxtasksperchild
, which is new in Python 2.7. Even then, you do not gain the ability to run code at process death. However, maxtasksperchild
might suit you, because any resources that the process opens will definitely go away when the process is terminated.
That being said, if you have a bunch of functions that you need to run cleanup on, you can save duplication of effort by designing a decorator. Here's an example of what I mean:
import functools
import multiprocessing
def cleanup(f):
"""Decorator for shared cleanup mechanism"""
@functools.wraps(f)
def wrapped(arg):
result = f(arg)
print("Cleaning up after f({0})".format(arg))
return result
return wrapped
@cleanup
def task1(arg):
print("Hello from task1({0})".format(arg))
return arg * 2
@cleanup
def task2(arg):
print("Bonjour from task2({0})".format(arg))
return arg ** 2
def main():
p = multiprocessing.Pool(processes=3)
print(p.map(task1, [1, 2, 3]))
print(p.map(task2, [1, 2, 3]))
if __name__ == "__main__":
main()
When you execute this (barring stdout
being jumbled because I'm not locking it here for brevity), the order you get things out should indicate that your cleanup task is running at the end of each task:
Hello from task1(1)
Cleaning up after f(1)
Hello from task1(2)
Cleaning up after f(2)
Hello from task1(3)
Cleaning up after f(3)
[2, 4, 6]
Bonjour from task2(1)
Cleaning up after f(1)
Bonjour from task2(2)
Cleaning up after f(2)
Bonjour from task2(3)
Cleaning up after f(3)
[1, 4, 9]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With