Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing: how to limit the number of waiting processes?

When running a large number of tasks (with large parameters) using Pool.apply_async, the processes are allocated and go to a waiting state, and there is no limit for the number of waiting processes. This can end up by eating all memory, as in the example below:

import multiprocessing
import numpy as np

def f(a,b):
    return np.linalg.solve(a,b)

def test():

    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))
    p.close()
    p.join()

if __name__ == '__main__':
    test()

I'm searching for a way to limit the waiting queue, in such a way that there is only a limited number of waiting processes, and Pool.apply_async is blocked while the waiting queue is full.

like image 780
André Panisson Avatar asked Jun 15 '12 18:06

André Panisson


2 Answers

multiprocessing.Pool has a _taskqueue member of type multiprocessing.Queue, which takes an optional maxsize parameter; unfortunately it constructs it without the maxsize parameter set.

I'd recommend subclassing multiprocessing.Pool with a copy-paste of multiprocessing.Pool.__init__ that passes maxsize to _taskqueue constructor.

Monkey-patching the object (either the pool or the queue) would also work, but you'd have to monkeypatch pool._taskqueue._maxsize and pool._taskqueue._sem so it would be quite brittle:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)
like image 144
ecatmur Avatar answered Oct 23 '22 01:10

ecatmur


Wait if pool._taskqueue is over the desired size:

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()
like image 26
Roger Dahl Avatar answered Oct 23 '22 03:10

Roger Dahl