Why does this ThreadPoolExecutor execute futures way before they are called?
import concurrent.futures
import time
def sleep_test(order_number):
num_seconds = 0.5
print(f"Order {order_number} - Sleeping {num_seconds} seconds")
time.sleep(num_seconds)
print(f"Order {order_number} - Slept {num_seconds} seconds")
if order_number == 4:
raise Exception("Reached order #4")
def main():
order_numbers = [i for i in range(10_000)]
max_number_of_threads = 2
with concurrent.futures.ThreadPoolExecutor(max_workers=max_number_of_threads) as executor:
futures = []
for order in order_numbers:
futures.append(executor.submit(sleep_test, order_number=order))
for future in futures:
if future.cancelled():
continue
try:
_ = future.result()
except Exception:
print("Caught Exception, stopping all future orders")
executor.shutdown(wait=False, cancel_futures=True)
if __name__ == "__main__":
main()
Here is a sample execution:
$ python3 thread_pool_test.py
Order 0 - Sleeping 0.5 seconds
Order 1 - Sleeping 0.5 seconds
Order 0 - Slept 0.5 seconds
Order 1 - Slept 0.5 seconds
Order 2 - Sleeping 0.5 seconds
Order 3 - Sleeping 0.5 seconds
Order 2 - Slept 0.5 seconds
Order 4 - Sleeping 0.5 seconds
Order 3 - Slept 0.5 seconds
Order 5 - Sleeping 0.5 seconds
Order 4 - Slept 0.5 seconds
Order 6 - Sleeping 0.5 seconds
Caught Exception, stopping all future orders
Order 5 - Slept 0.5 seconds
Order 4706 - Sleeping 0.5 seconds
Order 6 - Slept 0.5 seconds
Order 4706 - Slept 0.5 seconds
All of a sudden Order 4706 is called seemingly out of nowhere which doesn't make sense to me. I expect the threads to stop at around Order 5 or 6 which is when the Exception is hit. Sometimes when I run the script it works as expected but other times it calls a function that is thousands of "futures" in the future.
Why is this happening? Can I stop this from happening?
You already got a good answer (thanks, @ken!), but since I typed this up before I'll post it now anyway.
There's generally no answer to questions like this short of staring at the implementation. Did anything in the docs promise that things would work here the way you expected? If not, you're at the mercy of the implementation.
In general, nothing about multi-thread or multi-process behavior is "synchronized" as you might expect it to be, unless the docs promise it, or you force it with your own machinery.
In this case, an Executor object maintains a queue of work items to be executed, and the code that passes those items out for execution runs in a different thread than Executor.shutdown() runs in (that runs in the thread you called it from, which in your test case is the program's main thread). The threads in the thread pool also extract work items from that queue.
So there's ample opportunity for race conditions under the covers, as multiple threads contend over who gets to extract the next item from the queue of pending work items. Look at the source code in Lib/concurrent/futures/thread.py, method ThreadPoolExecutor.shutdown(). The relevant part loops over the queue one at a time until the queue is empty:
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
But otherwise-idle worker threads in the pool are also looping over the queue simultaneously, looking for new work items to work on. "Who wins" isn't defined, or predictable.
In your test program, the worker threads are sleeping long enough that it's likely that the shutdown loop can consume the entire queue before a worker thread wakes up again and comes back to look for more work to do. But there's no guarantee of that. And the more items on the queue, the more likely a worker will wake up and grab another work item before shutdown() consumes the entire queue.
For example, on my box just now, boosting the number of work items to a million:
Order 0 - Sleeping 0.5 seconds
Order 1 - Sleeping 0.5 seconds
Order 0 - Slept 0.5 seconds
Order 1 - Slept 0.5 seconds
...
Order 36 - Sleeping 0.5 seconds
Order 35 - Slept 0.5 seconds
Order 37 - Sleeping 0.5 seconds
Caught Exception, stopping all future orders
Order 36 - Slept 0.5 seconds
Order 331479 - Sleeping 0.5 seconds
Order 37 - Slept 0.5 seconds
Order 382046 - Sleeping 0.5 seconds
Order 331479 - Slept 0.5 seconds
Order 792063 - Sleeping 0.5 seconds
Order 382046 - Slept 0.5 seconds
Order 837591 - Sleeping 0.5 seconds
Order 792063 - Slept 0.5 seconds
Order 837591 - Slept 0.5 seconds
The queue was so large in this case that worker threads woke up and grabbed new items multiple times while shutdown() was working on draining the queue.
You can't stop this, but you should learn to live with it ;-) In a parallel world, calling .submit() just says you want an item to be executed sometime. You shouldn't assume anything about when it will be executed (for example, you shouldn't even assume that items passed to .submit() before will be executed before the new item - there's no guarantee of that either), or that it will be possible to cancel it before it runs. Any kind of synchronized, deterministic behavior you may want needs to be forced with appropriate extra app-specific synchronization code (locks, queues, semaphores, events, condition variables, ...).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With