Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python ThreadPoolExecutor - is the callback guaranteed to run in the same thread as submitted func?

In the ThreadPoolExecutor (TPE), is the callback always guaranteed to run in the same thread as the submitted function?

For example, I tested this with the following code. I ran it many times and it seemed like func and callback always ran in the same thread.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

However, it seemed to fail when I removed the time.sleep(random.random()) statements, i.e. at least a few func functions and callbacks did not run in the same thread.

For a project that I am working on, the callback must always run on the same thread as the submitted function, so I wanted to be sure that this is guaranteed by TPE. (And also the results of the test without the random sleep seemed puzzling).

I looked at the source code for executors and it does not seem like we switch the thread to the main thread before we run the callback. But just wanted to be sure.

like image 335
Praveen Gollakota Avatar asked Sep 24 '14 16:09

Praveen Gollakota


People also ask

How does ThreadPoolExecutor work in Python?

ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

How does concurrent futures ThreadPoolExecutor work?

ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.

Why is it a good idea to use a ThreadPoolExecutor?

Why Use a ThreadPoolExecutor? ThreadPoolExecutors provide a simple abstraction around spinning up multiple threads and using these threads to perform tasks in a concurrent fashion. Adding threading to your application can help to drastically improve the speed of your application when used in the right context.

Is Python truly multithreaded?

Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.


1 Answers

The documentation does not guarantee which thread callbacks run in. The only documented guarantee is that callbacks will be run in a thread belonging to the process that added the callback, but that could be any thread, since you're using a ThreadPoolExecutor instead of a ProcessPoolExecutor:

Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.


In the current ThreadPoolExecutor implementation, the thread a callback executes in depends on the state of the Future at the time the callback is added, and whether or not the Future is cancelled. These are implementation details; you should not rely on them, as they may be different in different Python implementations or different versions, and they are subject to change without notice.

If you add the callback after the Future completes, the callback will execute in whatever thread you called add_done_callback in. You can see this by looking at the add_done_callback source:

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)

If the state of the Future indicates it's cancelled or finished, fn is just immediately called in the current thread of execution. Otherwise, it's added to an internal list of callbacks to run when the Future is complete.

For example:

>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>

If a future is cancelled by a successful cancel call, then the thread performing the cancellation immediately invokes all callbacks:

def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

Otherwise, callbacks are invoked by the thread that executes the future's task.

like image 51
dano Avatar answered Oct 14 '22 05:10

dano