Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing.Pool kill *specific* long running or hung process

I need to execute a pool of many parallel database connections and queries. I would like to use a multiprocessing.Pool or concurrent.futures ProcessPoolExecutor. Python 2.7.5

In some cases, query requests take too long or will never finish (hung/zombie process). I would like to kill the specific process from the multiprocessing.Pool or concurrent.futures ProcessPoolExecutor that has timed out.

Here is an example of how to kill/re-spawn the entire process pool, but ideally I would minimize that CPU thrashing since I only want to kill a specific long running process that has not returned data after timeout seconds.

For some reason the code below does not seem to be able to terminate/join the process Pool after all results are returned and completed. It may have to do with killing worker processes when a timeout occurs, however the Pool creates new workers when they are killed and results are as expected.

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    while results:
        try:
            x, result = results.pop(0)
            start = time.time()
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)

        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for p in pool._pool:
                if p.exitcode is None:
                    p.terminate()

    pool.terminate()
    pool.join()
like image 610
dragoljub Avatar asked Nov 18 '13 18:11

dragoljub


People also ask

How do you kill a process in Python multiprocessing?

We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How do you stop a multiprocessing pool?

You can forcefully kill tasks in the process pool by the Pool terminate() function that will terminate all child worker processes immediately.

How do processes pools work in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.

What does multiprocessing pool do in Python?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).


1 Answers

I am not fully understanding your question. You say you want to stop one specific process, but then, in your exception handling phase, you are calling terminate on all jobs. Not sure why you are doing that. Also, I am pretty sure using internal variables from multiprocessing.Pool is not quite safe. Having said all of that, I think your question is why this program does not finish when a time out happens. If that is the problem, then the following does the trick:

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    result = None
    start = time.time()
    while results:
        try:
            x, result = results.pop(0)
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for i in reversed(range(len(pool._pool))):
                p = pool._pool[i]
                if p.exitcode is None:
                    p.terminate()
                del pool._pool[i]

    pool.terminate()
    pool.join()

The point is you need to remove items from the pool; just calling terminate on them is not enough.

like image 59
stacksia Avatar answered Oct 15 '22 19:10

stacksia