Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

concurrent.futures.ThreadPoolExecutor.map(): timeout not working

import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

The docs say:

The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map()

But here the script didn't raise any exception and kept waiting. Any suggestions?

like image 450
Hao Wang Avatar asked Jul 19 '18 10:07

Hao Wang


People also ask

What is concurrent futures ThreadPoolExecutor?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

How does executor map work?

The ThreadPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map(). For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map().

Is ThreadPoolExecutor faster?

ThreadPoolExecutor Can Be Slower for CPU-Bound Tasks Using the ThreadPoolExecutor for a CPU-bound task can be slower than not using it. This is because Python threads are constrained by the Global Interpreter Lock, or GIL.

Is ThreadPoolExecutor thread safe Python?

ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.


1 Answers

As the docs specify, the timeout error will only be raised if you're calling the __next__() method on the map. To call this method, you could for example convert the output to a list:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

Output:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

Here, the n-th task sleeps for n seconds, so the timeout is raised after task 2 is completed.


EDIT: If you want to terminate the tasks that didn't complete, you could try the answers in this question (they don't use ThreadPoolExecutor.map() though), or you could just ignore the returned values from the other tasks and let them finish:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

Output:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
like image 127
TrakJohnson Avatar answered Oct 17 '22 01:10

TrakJohnson