Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Timeout for each thread in ThreadPool in python

I am using Python 2.7.

I am currently using ThreadPoolExecuter like this:

params = [1,2,3,4,5,6,7,8,9,10]
with concurrent.futures.ThreadPoolExecutor(5) as executor:
    result = list(executor.map(f, params))

The problem is that f sometimes runs for too long. Whenever I run f, I want to limit its run to 100 seconds, and then kill it.

Eventually, for each element x in param, I would like to have an indication of whether or not f had to be killed, and in case it wasn't - what was the return value. Even if f times out for one parameter, I still want to run it with the next parameters.

The executer.map method does have a timeout parameter, but it sets a timeout for the entire run, from the time of the call to executer.map, and not for each thread separately.

What is the easiest way to get my desired behavior?

like image 468
user302099 Avatar asked Sep 22 '14 14:09

user302099


People also ask

How does Threadpool work Python?

In Python, a Thread Pool is a group of idle threads pre-instantiated and are ever ready to be given the task. We can either instantiate new threads for each or use Python Thread Pool for new threads. But when the number of tasks is way more than Python Thread Pool is preferred over the former method.

What is threading limitations of Python?

In fact, a Python process cannot run threads in parallel but it can run them concurrently through context switching during I/O bound operations. This limitation is actually enforced by GIL. The Python Global Interpreter Lock (GIL) prevents threads within the same process to be executed at the same time.

How do you stop a thread pool in Python?

You can call the cancel() function on the Future object to cancel the task before it has started running. If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.


1 Answers

This answer is in terms of python's multiprocessing library, which is usually preferable to the threading library, unless your functions are just waiting on network calls. Note that the multiprocessing and threading libraries have the same interface.

Given you're processes run for potentially 100 seconds each, the overhead of creating a process for each one is fairly small in comparison. You probably have to make your own processes to get the necessary control.

One option is to wrap f in another function that will exectue for at most 100 seconds:

from multiprocessing import Pool

def timeout_f(arg):
    pool = Pool(processes=1)
    return pool.apply_async(f, [arg]).get(timeout=100)

Then your code changes to:

    result = list(executor.map(timeout_f, params))

Alternatively, you could write your own thread/process control:

from multiprocessing import Process
from time import time

def chunks(l, n):
    """ Yield successive n-sized chunks from l. """
    for i in xrange(0, len(l), n):
        yield l[i:i+n]

processes = [Process(target=f, args=(i,)) for i in params]
exit_codes = []
for five_processes = chunks(processes, 5):
    for p in five_processes:
        p.start()
    time_waited = 0
    start = time()
    for p in five_processes:
        if time_waited >= 100:
            p.join(0)
            p.terminate()
        p.join(100 - time_waited)
        p.terminate()
        time_waited = time() - start
    for p in five_processes:
        exit_codes.append(p.exit_code)

You'd have to get the return values through something like Can I get a return value from multiprocessing.Process?

The exit codes of the processes are 0 if the processes completed and non-zero if they were terminated.

Techniques from: Join a group of python processes with a timeout, How do you split a list into evenly sized chunks?


As another option, you could just try to use apply_async on multiprocessing.Pool

from multiprocessing import Pool, TimeoutError
from time import sleep    

if __name__ == "__main__":
    pool = Pool(processes=5)
    processes = [pool.apply_async(f, [i]) for i in params]
    results = []
    for process in processes:
        try:
            result.append(process.get(timeout=100))
        except TimeoutError as e:
            results.append(e)

Note that the above possibly waits more than 100 seconds for each process, as if the first one takes 50 seconds to complete, the second process will have had 50 extra seconds in its run time. More complicated logic (such as the previous example) is needed to enforce stricter timeouts.

like image 198
Zags Avatar answered Sep 27 '22 23:09

Zags