Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing - map over list, killing processes that stall above timeout limit

I have a list of elements that I want to modify using multiprocessing. The issue is that for some particular inputs (unobservable prior to attempting), part of the my function stalls. I've shown this conceptually with the code below, where the function sometimes_stalling_processing() will occasionally stall indefinitely.

To put this into context, I'm processing a bunch of links using a web scraper and some of these links stall even with the use of timeout in the requests module. I've attempted different approaches (e.g. using eventlet), but come to the conclusion that it's perhaps easier to handle it at the multiprocessing level.

def stable_processing(obs):
    ...
    return processed_obs

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

def extract_info(obs):
    new_obs = stable_processing(obs)
    try:
        new_obs = sometimes_stalling_processing(obs)
    except MyTimedOutError: # error doesn't exist, just here for conceptual purposes
        pass
    return new_obs

pool = Pool(processes=n_threads)
processed_dataset = pool.map(extract_info, dataset)
pool.close()
pool.join()

This question (How can I abort a task in a multiprocessing.Pool after a timeout?) seems very similar, but I've been unable to convert it to work with map instead of apply. I've also tried using the eventlet package, but that doesn't work. Note that I'm using Python 2.7.

How do I make pool.map() timeout on individual observations and kill sometimes_stalling_processing?

like image 620
pir Avatar asked Jun 07 '17 01:06

pir


1 Answers

You can take a look at the pebble library.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

def sometimes_stalling_processing(obs):
    ...
    return processed_obs

with ProcessPool() as pool:
    future = pool.map(sometimes_stalling_processing, dataset, timeout=10)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])

More examples in the documentaion.

like image 164
noxdafox Avatar answered Nov 15 '22 18:11

noxdafox