Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous multiprocessing with a worker pool in Python: how to keep going after timeout?

I would like to run a number of jobs using a pool of processes and apply a given timeout after which a job should be killed and replaced by another working on the next task.

I have tried to use the multiprocessing module which offers a method to run of pool of workers asynchronously (e.g. using map_async), but there I can only set a "global" timeout after which all processes would be killed.

Is it possible to have an individual timeout after which only a single process that takes too long is killed and a new worker is added to the pool again instead (processing the next task and skipping the one that timed out)?

Here's a simple example to illustrate my problem:

def Check(n):
  import time
  if n % 2 == 0: # select some (arbitrary) subset of processes
    print "%d timeout" % n
    while 1:
      # loop forever to simulate some process getting stuck
      pass
  print "%d done" % n
  return 0

from multiprocessing import Pool
pool = Pool(processes=4)
result = pool.map_async(Check, range(10))
print result.get(timeout=1)    

After the timeout all workers are killed and the program exits. I would like instead that it continues with the next subtask. Do I have to implement this behavior myself or are there existing solutions?

Update

It is possible to kill the hanging workers and they are automatically replaced. So I came up with this code:

jobs = pool.map_async(Check, range(10))
while 1:
  try:
    print "Waiting for result"
    result = jobs.get(timeout=1)
    break # all clear
  except multiprocessing.TimeoutError: 
    # kill all processes
    for c in multiprocessing.active_children():
      c.terminate()
print result

The problem now is that the loop never exits; even after all tasks have been processed, calling get yields a timeout exception.

like image 512
fuenfundachtzig Avatar asked Jan 08 '14 09:01

fuenfundachtzig


2 Answers

Currently the Python does not provide native means to the control execution time of each distinct task in the pool outside the worker itself.
So the easy way is to use wait_procs in the psutil module and implement the tasks as subprocesses.
If nonstandard libraries are not desirable, then you have to implement own Pool on base of subprocess module having the working cycle in the main process, poll() - ing the execution of each worker and performing required actions.

As for the updated problem, the pool becomes corrupted if you directly terminate one of the workers (it is the bug in the interpreter implementation, because such behavior should not be allowed): the worker is recreated, but the task is lost and the pool becomes nonjoinable. You have to terminate all the pool and then recreate it again for another tasks:

from multiprocessing import Pool
while True:
    pool = Pool(processes=4)
    jobs = pool.map_async(Check, range(10))
    print "Waiting for result"
    try:
        result = jobs.get(timeout=1)
        break # all clear
    except multiprocessing.TimeoutError: 
        # kill all processes
        pool.terminate()
        pool.join()
print result    

UPDATE

Pebble is an excellent and handy library, which solves the issue. Pebble is designed for the asynchronous execution of Python functions, where is PyExPool is designed for the asynchronous execution of modules and external executables, though both can be used interchangeably.

One more aspect is when 3dparty dependencies are not desirable, then PyExPool can be a good choice, which is a single-file lightweight implementation of Multi-process Execution Pool with per-Job and global timeouts, opportunity to group Jobs into Tasks and other features.
PyExPool can be embedded into your sources and customized, having permissive Apache 2.0 license and production quality, being used in the core of one high-loaded scientific benchmarking framework.

like image 29
luart Avatar answered Oct 09 '22 18:10

luart


The pebble Pool module has been built for solving these types of issue. It supports timeout on given tasks allowing to detect them and easily recover.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1,2], timeout=5)

try:
    result = future.result()
except TimeoutError:
    print "Function took longer than %d seconds" % error.args[1]

For your specific example:

from pebble import ProcessPool
from concurrent.futures import TimeoutError

results = []

with ProcessPool(max_workers=4) as pool:
    future = pool.map(Check, range(10), timeout=5)

    iterator = future.result()

    # iterate over all results, if a computation timed out
    # print it and continue to the next result
    while True:
        try:
            result = next(iterator)
            results.append(result)
        except StopIteration:
            break  
        except TimeoutError as error:
            print "function took longer than %d seconds" % error.args[1] 

print results
like image 100
noxdafox Avatar answered Oct 09 '22 17:10

noxdafox