Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3: How to submit an async function to a threadPool?

I want to use both ThreadPoolExecutor from concurrent.futures and async functions.

My program repeatedly submits a function with different input values to a thread pool. The final sequence of tasks that are executed in that larger function can be in any order, and I don't care about the return value, just that they execute at some point in the future.

So I tried to do this

async def startLoop():

    while 1:
        for item in clients:
            arrayOfFutures.append(await config.threadPool.submit(threadWork, obj))

        wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)

where the function submitted is:

async def threadWork(obj):
   bool = do_something() # needs to execute before next functions
   if bool:
       do_a() # can be executed at any time
       do_b() # ^

where do_b and do_a are async functions.The problem with this is that I get the error: TypeError: object Future can't be used in 'await' expression and if I remove the await, I get another error saying I need to add await.

I guess I could make everything use threads, but I don't really want to do that.

like image 672
Manu Masson Avatar asked Feb 19 '19 04:02

Manu Masson


People also ask

How do you call an async function in Python?

To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.

Is ThreadPoolExecutor asynchronous?

ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit.

How do you use Threadpool in Python?

How to create a ThreadPoolExecutor? With the help of concurrent. futures module and its concrete subclass Executor, we can easily create a pool of threads. For this, we need to construct a ThreadPoolExecutor with the number of threads we want in the pool.

Does async await use thread pool?

The async and await keywords don't cause additional threads to be created. Async methods don't require multithreading because an async method doesn't run on its own thread. The method runs on the current synchronization context and uses time on the thread only when the method is active.


1 Answers

I recommend a careful readthrough of Python 3's asyncio development guide, particularly the "Concurrency and Multithreading" section.

The main conceptual issue in your example that event loops are single-threaded, so it doesn't make sense to execute an async coroutine in a thread pool. There are a few ways for event loops and threads to interact:

  • Event loop per thread. For example:

     async def threadWorkAsync(obj):
         b = do_something()
         if b:
             # Run a and b as concurrent tasks
             task_a = asyncio.create_task(do_a())
             task_b = asyncio.create_task(do_b())
             await task_a
             await task_b
    
     def threadWork(obj):
         # Create run loop for this thread and block until completion
         asyncio.run(threadWorkAsync())
    
     def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(config.threadPool.submit(threadWork, item))
    
             wait(arrayOfFutures, timeout=None, return_when=ALL_COMPLETED)
    
  • Execute blocking code in an executor. This allows you to use async futures instead of concurrent futures as above.

     async def startLoop():
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item))
    
             await asyncio.gather(*arrayOfFutures)
    
  • Use threadsafe functions to submit tasks to event loops across threads. For example, instead of creating a run loop for each thread you could run all async coroutines in the main thread's run loop:

     def threadWork(obj, loop):
         b = do_something()
         if b:
             future_a = asyncio.run_coroutine_threadsafe(do_a(), loop)
             future_b = asyncio.run_coroutine_threadsafe(do_b(), loop)
             concurrent.futures.wait([future_a, future_b])
    
     async def startLoop():
         loop = asyncio.get_running_loop()
         while 1:
             arrayOfFutures = []
             for item in clients:
                 arrayOfFutures.append(asyncio.run_in_executor(
                     config.threadPool, threadWork, item, loop))
    
             await asyncio.gather(*arrayOfFutures)
    

    Note: This example should not be used literally as it will result in all coroutines executing in the main thread while the thread pool workers just block. This is just to show an example of the run_coroutine_threadsafe() method.

like image 144
augurar Avatar answered Oct 17 '22 06:10

augurar