In the ThreadPoolExecutor (TPE), is the callback always guaranteed to run in the same thread as the submitted function?
For example, I tested this with the following code. I ran it many times and it seemed like func
and callback
always ran in the same thread.
import concurrent.futures
import random
import threading
import time
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def func(x):
time.sleep(random.random())
return threading.current_thread().name
def callback(future):
time.sleep(random.random())
x = future.result()
cur_thread = threading.current_thread().name
if (cur_thread != x):
print(cur_thread, x)
print('main thread: %s' % threading.current_thread())
for i in range(10000):
future = executor.submit(func, i)
future.add_done_callback(callback)
However, it seemed to fail when I removed the time.sleep(random.random())
statements, i.e. at least a few func
functions and callbacks
did not run in the same thread.
For a project that I am working on, the callback must always run on the same thread as the submitted function, so I wanted to be sure that this is guaranteed by TPE. (And also the results of the test without the random sleep seemed puzzling).
I looked at the source code for executors and it does not seem like we switch the thread to the main thread before we run the callback. But just wanted to be sure.
ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.
Why Use a ThreadPoolExecutor? ThreadPoolExecutors provide a simple abstraction around spinning up multiple threads and using these threads to perform tasks in a concurrent fashion. Adding threading to your application can help to drastically improve the speed of your application when used in the right context.
Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
The documentation does not guarantee which thread callbacks run in. The only documented guarantee is that callbacks will be run in a thread belonging to the process that added the callback, but that could be any thread, since you're using a ThreadPoolExecutor instead of a ProcessPoolExecutor:
Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.
In the current ThreadPoolExecutor implementation, the thread a callback executes in depends on the state of the Future
at the time the callback is added, and whether or not the Future
is cancelled. These are implementation details; you should not rely on them, as they may be different in different Python implementations or different versions, and they are subject to change without notice.
If you add the callback after the Future
completes, the callback will execute in whatever thread you called add_done_callback
in. You can see this by looking at the add_done_callback
source:
def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
If the state of the Future
indicates it's cancelled or finished, fn
is just immediately called in the current thread of execution. Otherwise, it's added to an internal list of callbacks to run when the Future
is complete.
For example:
>>> def func(*args):
... time.sleep(5)
... print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
...
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>
If a future is cancelled by a successful cancel
call, then the thread performing the cancellation immediately invokes all callbacks:
def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
Otherwise, callbacks are invoked by the thread that executes the future's task.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With