Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing and garbage collection

In py2.6+, the multiprocessing module offers a Pool class, so one can do:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        return pool.imap(...)

However, with the standard Python implementation at 2.7.2, this approach soon leads to "IOError: [Errno 24] Too many open files". Apparently the pool object never gets garbage collected, so its processes never terminate, accumulating whatever descriptors are opened internally. I think this because the following works:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        result = pool.map(...)
        pool.terminate()
        return result

I would like to keep the "lazy" iterator approach of imap; how does the garbage collector work in that case? How to fix the code?

like image 382
user124114 Avatar asked Mar 31 '12 20:03

user124114


People also ask

What is multiprocessing function?

Introduction. multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

What is garbage collection in memory?

Garbage collection (GC) is a memory recovery feature built into programming languages such as C# and Java. A GC-enabled programming language includes one or more garbage collectors (GC engines) that automatically free up memory space that has been allocated to objects no longer needed by the program.

Is multiprocessing faster?

So, multiprocessing is faster when the program is CPU-bound. In cases where there is a lot of I/O in your program, threading may be more efficient because most of the time, your program is waiting for the I/O to complete. However, multiprocessing is generally more efficient because it runs concurrently.

What does multiprocessing manager do?

Multiprocessing Manager provides a way of creating centralized Python objects that can be shared safely among processes. Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines.


2 Answers

Indeed, even when all user references to the pool object are deleted, and no tasks are in the queue code, and all garbage collection is done, then still the processes stay as unusable zombies in the operating system - plus we have 3 zombie service threads from Pool hanging (Python 2.7 and 3.4):

>>> del pool
>>> gc.collect()
0
>>> gc.garbage
[]
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, 
 <Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]

And further Pool()'s will add more and more process and thread zombies... which stay until the main process is terminated.

It requires a special poke to stop such zombie pool - via its service thread _handle_workers:

>>> ths = threading.enumerate()
>>> for th in ths: 
...     try: th.name, th._state, th._Thread__target
...     except AttributeError: pass
...     
('MainThread', 1, None)
('Thread-8', 0, <function _handle_tasks at 0x01462A30>)
('Thread-9', 0, <function _handle_results at 0x014629F0>)
('Thread-7', 0, <function _handle_workers at 0x01462A70>)
>>> ths[-1]._state = multiprocessing.pool.CLOSE  # or TERMINATE
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>]
>>> 

That terminates the other service threads and also terminates the child processes.


I think one problem is, that there is a resource leak bug in the Python library, which could be fixed by right usage of weakref's.

The other point is that Pool creation & termination is expensive (including 3 service threads per pool just for management!), and there is ususually no reason to have much more worker processes than CPU cores (high CPU loads) or more than a limited number according to another limiting resource (e.g. network bandwidth). So its reasonable to treat a pool more like a singular app global resource (optionally managed by a timeout) rather than a quicky object just held by a closure (or a terminate()-workaround because of the bug).

For example:

try:
    _unused = pool   # reload safe global var
except NameError:
    pool = None

def get_pool():
    global pool
    if pool is None:
        atexit.register(stop_pool)
        pool = Pool(CPUCORES)
    return pool

def stop_pool():
    global pool
    if pool:
        pool.terminate()
        pool = None
like image 66
kxr Avatar answered Oct 27 '22 00:10

kxr


In python, you basically have no guarantee of when things will be destroyed, and in this case this is not how multiprocessing pools are designed to be used.

The right thing to do is to share a single pool across multiple calls to the function. The easiest way to do that is to store the pool as a class (or, maybe, instance) variable:

class Dispatcher:
    pool = multiprocessing.Pool()
    def do_stuff(self, ...):
        result = self.pool.map(...)
        return result
like image 20
James Avatar answered Oct 27 '22 00:10

James