Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I abort a task in a multiprocessing.Pool after a timeout?

I am trying to use the multiprocessing package of python in this way:

featureClass = [[1000, k, 1] for k in drange(start, end, step)] #list of arguments
for f in featureClass:
  pool.apply_async(worker, args=f, callback=collectMyResult)
pool.close()
pool.join

From processes of the pool I want to avoid waiting those which take more than 60s to return its result. Is that possible?

like image 470
farhawa Avatar asked Apr 07 '15 14:04

farhawa


People also ask

How do you terminate a multiprocessing pool?

The process pool can be forcefully shutdown by calling the Pool. terminate() function.

How do you close a multiprocessing process in python?

We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

What is the difference between pool and process in multiprocessing?

Pool supports multiple tasks, whereas the multiprocessing. Process class supports a single task. The Pool is designed to submit and execute multiple tasks. For example, the map(), imap(), and starmap() functions are explicitly designed to perform multiple function calls in parallel.


Video Answer


1 Answers

Here's a way you can do this without needing to change your worker function. There are two steps required:

  1. Use the maxtasksperchild option you can pass to multiprocessing.Pool to ensure the worker processes in the pool are restarted after every task execution.
  2. Wrap your existing worker function in another function, which will call worker in a daemon thread, and then wait for a result from that thread for timeout seconds. Using a daemon thread is important because processes won't wait for daemon threads to finish before exiting.

If the timeout expires, you exit (or abort - it's up to you) the wrapper function, which will end the task, and because you've set maxtasksperchild=1, cause the Pool to terminate the worker process and start a new one. This will mean that the background thread doing your real work also gets aborted, because it's a daemon thread, and the process it's living got shut down.

import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def worker(x, y, z):
    pass # Do whatever here

def collectMyResult(result):
    print("Got result {}".format(result))

def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        out = res.get(timeout)  # Wait timeout seconds for func to complete.
        return out
    except multiprocessing.TimeoutError:
        print("Aborting due to timeout")
        raise

if __name__ == "__main__":
    pool = multiprocessing.Pool(maxtasksperchild=1)
    featureClass = [[1000,k,1] for k in range(start,end,step)] #list of arguments
    for f in featureClass:
      abortable_func = partial(abortable_worker, worker, timeout=3)
      pool.apply_async(abortable_func, args=f,callback=collectMyResult)
    pool.close()
    pool.join()

Any function that timeouts will raise multiprocessing.TimeoutError. Note that this means your callback won't execute when a timeout occurs. If this isn't acceptable, just change the except block of abortable_worker to return something instead of calling raise.

Also keep in mind that restarting worker processes after every task execution will have a negative impact on the performance of the Pool, due to the increased overhead. You should measure that for your use-case and see if the trade-off is worth it to have the ability to abort the work. If it's a problem, you may need to try another approach, like co-operatively interrupting worker if it has run too long, rather than trying to kill it from the outside. There are many questions on SO that cover this topic.

like image 179
dano Avatar answered Sep 18 '22 14:09

dano