In py2.6+, the multiprocessing
module offers a Pool
class, so one can do:
class Volatile(object):
def do_stuff(self, ...):
pool = multiprocessing.Pool()
return pool.imap(...)
However, with the standard Python implementation at 2.7.2, this approach soon leads to "IOError: [Errno 24] Too many open files". Apparently the pool
object never gets garbage collected, so its processes never terminate, accumulating whatever descriptors are opened internally. I think this because the following works:
class Volatile(object):
def do_stuff(self, ...):
pool = multiprocessing.Pool()
result = pool.map(...)
pool.terminate()
return result
I would like to keep the "lazy" iterator approach of imap
; how does the garbage collector work in that case? How to fix the code?
Introduction. multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Garbage collection (GC) is a memory recovery feature built into programming languages such as C# and Java. A GC-enabled programming language includes one or more garbage collectors (GC engines) that automatically free up memory space that has been allocated to objects no longer needed by the program.
So, multiprocessing is faster when the program is CPU-bound. In cases where there is a lot of I/O in your program, threading may be more efficient because most of the time, your program is waiting for the I/O to complete. However, multiprocessing is generally more efficient because it runs concurrently.
Multiprocessing Manager provides a way of creating centralized Python objects that can be shared safely among processes. Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines.
Indeed, even when all user references to the pool
object are deleted, and no tasks are in the queue code, and all garbage collection is done, then still the processes stay as unusable zombies in the operating system - plus we have 3 zombie service threads from Pool
hanging (Python 2.7 and 3.4):
>>> del pool
>>> gc.collect()
0
>>> gc.garbage
[]
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>,
<Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]
And further Pool()
's will add more and more process and thread zombies... which stay until the main process is terminated.
It requires a special poke to stop such zombie pool - via its service thread _handle_workers
:
>>> ths = threading.enumerate()
>>> for th in ths:
... try: th.name, th._state, th._Thread__target
... except AttributeError: pass
...
('MainThread', 1, None)
('Thread-8', 0, <function _handle_tasks at 0x01462A30>)
('Thread-9', 0, <function _handle_results at 0x014629F0>)
('Thread-7', 0, <function _handle_workers at 0x01462A70>)
>>> ths[-1]._state = multiprocessing.pool.CLOSE # or TERMINATE
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>]
>>>
That terminates the other service threads and also terminates the child processes.
I think one problem is, that there is a resource leak bug in the Python library, which could be fixed by right usage of weakref
's.
The other point is that Pool
creation & termination is expensive (including 3 service threads per pool just for management!), and there is ususually no reason to have much more worker processes than CPU cores (high CPU loads) or more than a limited number according to another limiting resource (e.g. network bandwidth). So its reasonable to treat a pool more like a singular app global resource (optionally managed by a timeout) rather than a quicky object just held by a closure (or a terminate()-workaround because of the bug).
For example:
try:
_unused = pool # reload safe global var
except NameError:
pool = None
def get_pool():
global pool
if pool is None:
atexit.register(stop_pool)
pool = Pool(CPUCORES)
return pool
def stop_pool():
global pool
if pool:
pool.terminate()
pool = None
In python, you basically have no guarantee of when things will be destroyed, and in this case this is not how multiprocessing pools are designed to be used.
The right thing to do is to share a single pool across multiple calls to the function. The easiest way to do that is to store the pool as a class (or, maybe, instance) variable:
class Dispatcher:
pool = multiprocessing.Pool()
def do_stuff(self, ...):
result = self.pool.map(...)
return result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With