I'm using the python multiprocessing functionality to map some function across some elements. Something along the lines of this:
def computeStuff(arguments, globalData, concurrent=True):
pool = multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))
results = pool.map(workerFunction, list(enumerate(arguments)))
return results
def initWorker(globalData):
workerFunction.globalData = globalData
def workerFunction((index, argument)):
... # computation here
Generally I run tests in ipython using both cPython and Pypy. I have noticed that the spawned processes often don't get killed, so they start accumulating, each using a gig of ram. This happens when hitting ctrl-k during a computation, which sends multiprocessing into a big frenzy of confusion. But even when letting computation finish, those processes won't die in Pypy.
According to the documentation, when the pool gets garbage collected, it should call terminate()
and kill all the processes. What's happening here? Do I have to explicitly call close()
? If yes, is there some sort of context manager that properly manages closing the resources (i.e. processes)?
This is on Mac OS X Yosemite.
PyPy's garbage collection is lazy, so failing to call close
means the Pool
is cleaned "sometime", but that might not mean "anytime soon".
Once the Pool
is properly close
d, the workers exit when they run out of tasks. An easy way to ensure the Pool
is closed in pre-3.3 Python is:
from contextlib import closing
def computeStuff(arguments, globalData, concurrent=True):
with closing(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool:
return pool.map(workerFunction, enumerate(arguments))
Note: I also removed the explicit conversion to list
(pointless, since map
will iterate the enumerate
iterator for you), and returned the results directly (no need to assign to a name only to return on the next line).
If you want to ensure immediate termination in the exception case (on pre-3.3 Python), you'd use a try/finally block, or write a simple context manager (which could be reused for other places where you use a Pool
):
from contextlib import contextmanager
@contextmanager
def terminating(obj):
try:
yield obj
finally:
obj.terminate()
def computeStuff(arguments, globalData, concurrent=True):
with terminating(multiprocessing.Pool(initializer=initWorker, initargs=(globalData,))) as pool:
return pool.map(workerFunction, enumerate(arguments))
The terminating
approach is superior in that it guarantees the processes exit immediately; in theory, if you're using threads elsewhere in your main program, the Pool
workers might be forked with non-daemon threads, which would keep the processes alive even when the worker task thread exited; terminating
hides this by killing the processes forcibly.
If your interpreter is Python 3.3 or higher, the terminating
approach is built-in to Pool
, so no special wrapper is needed for the with
statement, with multiprocessing.Pool(initializer=initWorker, initargs=(globalData,)) as pool:
works directly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With