import concurrent.futures
import time
def process_one(i):
try:
print("dealing with {}".format(i))
time.sleep(50)
print("{} Done.".format(i))
except Exception as e:
print(e)
def process_many():
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
executor.map(process_one,
range(100),
timeout=3)
if __name__ == '__main__':
MAX_WORKERS = 10
try:
process_many()
except Exception as e:
print(e)
The docs say:
The returned iterator raises a
concurrent.futures.TimeoutError
if__next__()
is called and the result isn’t available aftertimeout
seconds from the original call toExecutor.map()
But here the script didn't raise any exception and kept waiting. Any suggestions?
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
The ThreadPoolExecutor map() function supports target functions that take more than one argument by providing more than one iterable as arguments to the call to map(). For example, we can define a target function for map that takes two arguments, then provide two iterables to the call to map().
ThreadPoolExecutor Can Be Slower for CPU-Bound Tasks Using the ThreadPoolExecutor for a CPU-bound task can be slower than not using it. This is because Python threads are constrained by the Global Interpreter Lock, or GIL.
ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.
As the docs specify, the timeout error will only be raised if you're calling the __next__()
method on the map. To call this method, you could for example convert the output to a list:
from concurrent import futures
import threading
import time
def task(n):
print("Launching task {}".format(n))
time.sleep(n)
print('{}: done with {}'.format(threading.current_thread().name, n))
return n / 10
with futures.ThreadPoolExecutor(max_workers=5) as ex:
results = ex.map(task, range(1, 6), timeout=3)
print('main: starting')
try:
# without this conversion to a list, the timeout error is not raised
real_results = list(results)
except futures._base.TimeoutError:
print("TIMEOUT")
Output:
Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5
Here, the n-th task sleeps for n
seconds, so the timeout is raised after task 2 is completed.
EDIT: If you want to terminate the tasks that didn't complete, you could try the answers in this question (they don't use ThreadPoolExecutor.map()
though), or you could just ignore the returned values from the other tasks and let them finish:
from concurrent import futures
import threading
import time
def task(n):
print("Launching task {}".format(n))
time.sleep(n)
print('{}: done with {}'.format(threading.current_thread().name, n))
return n
with futures.ThreadPoolExecutor(max_workers=5) as ex:
results = ex.map(task, range(1, 6), timeout=3)
outputs = []
try:
for i in results:
outputs.append(i)
except futures._base.TimeoutError:
print("TIMEOUT")
print(outputs)
Output:
Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With