Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to monitor python's concurrent.futures.ProcessPoolExecutor?

We are using the ProcessPoolExecutor from concurrent.futures in a service that asynchronously receives requests, and does the actual, synchronous processing in the process pool.

Once we ran into the case that the process pool was exhausted, so new requests had to wait until some other processes were finished.

Is there a way to interrogate the process pool for its current usage? That would allow us to monitor their state and do proper capacity planning.

If there isn't, is there any good alternative process pool implementation with an asynchronous interface that supports such monitoring/capacity planning?

like image 346
moritz Avatar asked Feb 01 '18 22:02

moritz


2 Answers

The simplest way would be to extend ProcessPoolExecutor with desired behaviour. The example below maintains stdlib interface and does not access implementation details:

from concurrent.futures import ProcessPoolExecutor


class MyProcessPoolExecutor(ProcessPoolExecutor):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._running_workers = 0

    def submit(self, *args, **kwargs):
        future = super().submit(*args, **kwargs)
        self._running_workers += 1
        future.add_done_callback(self._worker_is_done)
        return future

    def _worker_is_done(self, future):
        self._running_workers -= 1

    def get_pool_usage(self):
        return self._running_workers
like image 191
void Avatar answered Sep 28 '22 05:09

void


I have recently solved this question for myself in a slightly different way. Simplified, here’s what I did:

  • I keep track of pending futures externally in a set that is defined in the scope of my main loop.
  • I attach a callback to each future, and this callback is a closure over the set of futures, allowing it to remove the future from the set when done.

So, given that done() is the actual callback function, defined elsewhere, the following is defined in the scope of my main loop:

bag = set()

def make_callback(b):

    def callback(f):
        nonlocal b
        b.remove(f)
        done(f)

    return callback

For each future f which I submit to the ProcessPoolExecutor, I add the callback:

f.add_done_callback(make_callback(bag))

At any time, it’s possible to see a list of pending and running futures by looking at the contents of bag, optionally filtered by the result of the future’s running() method. E.g.:

print(*bag, sep='\n')
print('running:', *(f for f in bag if f.running()))

For many straightforward use cases, a module-level set variable would probably work just as well as the closure.

like image 30
wjv Avatar answered Sep 28 '22 06:09

wjv