Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handle multiprocessing.TimeoutError in multiprocessing pool.map_async()

Up to now I do this:

rets=set(pool.map_async(my_callback, args.hosts).get(60*4))

If the timeout is hit, I get an exception:

 File "/usr/lib/python2.7/multiprocessing/pool.py", line 524, in get
    raise TimeoutError
multiprocessing.TimeoutError

I would like to handle this gracefully:

The output for all hosts I could reach should go into rets and all hosts which timed out should go into a separate list.

How could this be done?

Update

Six years later I think it makes more sense to use go instead of Python for concurrent applications.

like image 590
guettli Avatar asked Jan 06 '23 23:01

guettli


1 Answers

As far as I know, you can't, or at least not with map_async. map_async is a convenience method there to solve a particular problem for a particular use case, and that doesn't match up with what you've got because you want more fine control.

However, you can still do it, you just need to use the more fine-grained methods in the multiprocessing module. In particular, you can add jobs to your pool on the fly by using apply_async, which gives you much more control over how to handle success and failure of individual tasks.

Below is a pretty minimal example that I think does what you want:

from multiprocessing.pool import Pool, TimeoutError
from time import sleep, time


def task_function(xx):
    print('Task %d running' % xx)
    sleep(xx)
    print('Task %d ended' % xx)
    return 'Result of task %d' % xx

pl = Pool()
results = [
    pl.apply_async(task_function, (_xx,)) 
    for _xx in range(10)]

start = time()
wait_until = start + 5

rets = []
timed_out_results = []

for res in results:
    timeout = wait_until - time()
    if timeout < 0:
        timeout = 0

    try:
        rets.append(res.get(timeout))
    except TimeoutError:
        timed_out_results.append(res)

print('%s ended' % (rets,))
print('%s timedout' % (timed_out_results,))

This runs 10 jobs that print a line, sleep, then print another line. The first sleeps for 0 seconds, the next for 1, the next 2 etc. We timeout the pool after 5 seconds, and so we expect 5 tasks to have finished and 5 tasks to have timed out.

Be aware that I have not stopped the tasks that are still running, and so in the real world they may continue and finish in the time it takes to print the results. You'll have to work out how much you care about that / what to do about it.

like image 85
daphtdazz Avatar answered Jan 10 '23 21:01

daphtdazz