I am using the new concurrent.futures
module (which also has a Python 2 backport) to do some simple multithreaded I/O. I am having trouble understanding how to cleanly kill tasks started using this module.
Check out the following Python 2/3 script, which reproduces the behavior I'm seeing:
#!/usr/bin/env python
from __future__ import print_function
import concurrent.futures
import time
def control_c_this():
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future1 = executor.submit(wait_a_bit, name="Jack")
future2 = executor.submit(wait_a_bit, name="Jill")
for future in concurrent.futures.as_completed([future1, future2]):
future.result()
print("All done!")
def wait_a_bit(name):
print("{n} is waiting...".format(n=name))
time.sleep(100)
if __name__ == "__main__":
control_c_this()
While this script is running it appears impossible to kill cleanly using the regular Control-C keyboard interrupt. I am running on OS X.
kill
from the command line to kill the script. Control-C is just ignored.Most documentation I've found online talks about how to cleanly kill threads with the old threading
module. None of it seems to apply here.
And all the methods provided within the concurrent.futures
module to stop stuff (like Executor.shutdown()
and Future.cancel()
) only work when the Futures haven't started yet or are complete, which is pointless in this case. I want to interrupt the Future immediately.
My use case is simple: When the user hits Control-C, the script should exit immediately like any well-behaved script does. That's all I want.
So what's the proper way to get this behavior when using concurrent.futures
?
The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.
Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
In Python, the Future object is returned from an Executor, such as a ThreadPoolExecutor when calling the submit() function to dispatch a task to be executed asynchronously. In general, we do not create Future objects; we only receive them and we may need to call functions on them.
It's kind of painful. Essentially, your worker threads have to be finished before your main thread can exit. You cannot exit unless they do. The typical workaround is to have some global state, that each thread can check to determine if they should do more work or not.
Here's the quote explaining why. In essence, if threads exited when the interpreter does, bad things could happen.
Here's a working example. Note that C-c takes at most 1 sec to propagate because the sleep duration of the child thread.
#!/usr/bin/env python
from __future__ import print_function
import concurrent.futures
import time
import sys
quit = False
def wait_a_bit(name):
while not quit:
print("{n} is doing work...".format(n=name))
time.sleep(1)
def setup():
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
future1 = executor.submit(wait_a_bit, "Jack")
future2 = executor.submit(wait_a_bit, "Jill")
# main thread must be doing "work" to be able to catch a Ctrl+C
# http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
while (not (future1.done() and future2.done())):
time.sleep(1)
if __name__ == "__main__":
try:
setup()
except KeyboardInterrupt:
quit = True
I encountered this, but the issue I had was that many futures (10's of thousands) would be waiting to run and just pressing Ctrl-C left them waiting, not actually exiting. I was using concurrent.futures.wait
to run a progress loop and needed to add a try ... except KeyboardInterrupt
to handle cancelling unfinished Futures.
POLL_INTERVAL = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
futures = [pool.submit(do_work, arg) for arg in large_set_to_do_work_over]
# next line returns instantly
done, not_done = concurrent.futures.wait(futures, timeout=0)
try:
while not_done:
# next line 'sleeps' this main thread, letting the thread pool run
freshly_done, not_done = concurrent.futures.wait(not_done, timeout=POLL_INTERVAL)
done |= freshly_done
# more polling stats calculated here and printed every POLL_INTERVAL seconds...
except KeyboardInterrupt:
# only futures that are not done will prevent exiting
for future in not_done:
# cancel() returns False if it's already done or currently running,
# and True if was able to cancel it; we don't need that return value
_ = future.cancel()
# wait for running futures that the above for loop couldn't cancel (note timeout)
_ = concurrent.futures.wait(not_done, timeout=None)
If you're not interested in keeping exact track of what got done and what didn't (i.e. don't want a progress loop), you can replace the first wait call (the one with timeout=0
) with not_done = futures
and still leave the while not_done:
logic.
The for future in not_done:
cancel loop can probably behave differently based on that return value (or be written as a comprehension), but waiting for futures that are done or canceled isn't really waiting - it returns instantly. The last wait
with timeout=None
ensures that pool's running jobs really do finish.
Again, this only works correctly if the do_work
that's being called actually, eventually returns within a reasonable amount of time. That was fine for me - in fact, I want to be sure that if do_work
gets started, it runs to completion. If do_work
is 'endless' then you'll need something like cdosborn's answer that uses a variable visible to all the threads, signaling them to stop themselves.
Late to the party, but I just had the same problem.
I want to kill my program immediately and I don't care what's going on. I don't need a clean shutdown beyond what Linux will do.
I found that replacing geitda's code in the KeyboardInterrupt exception handler with os.kill(os.getpid(), 9)
exits immediately after the first ^C.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With