Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why coroutines cannot be used with run_in_executor?

Tags:

I want to run a service that requests urls using coroutines and multithread. However I cannot pass coroutines to the workers in the executor. See the code below for a minimal example of this issue:

import time
import asyncio
import concurrent.futures

EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5)

async def async_request(loop):
    await asyncio.sleep(3)

def sync_request(_):
    time.sleep(3)

async def main(loop):
    futures = [loop.run_in_executor(EXECUTOR, async_request,loop) 
               for x in range(10)]

    await asyncio.wait(futures)

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

Resulting in the following error:

Traceback (most recent call last):
  File "co_test.py", line 17, in <module>
    loop.run_until_complete(main(loop))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "co_test.py", line 10, in main
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
  File "co_test.py", line 10, in <listcomp>
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)]
  File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor
    raise TypeError("coroutines cannot be used with run_in_executor()")
TypeError: coroutines cannot be used with run_in_executor()

I know that I could use sync_request funcion instead of async_request, in this case I would have coroutines by means of sending the blocking function to another thread.

I also know I could call async_request ten times in the event loop. Something like in the code below:

loop = asyncio.get_event_loop()
futures = [async_request(loop) for i in range(10)]
loop.run_until_complete(asyncio.wait(futures))

But in this case I would be using a single thread.

How could I use both scenarios, the coroutines working within multithreads? As you can see by the code, I am passing (and not using) the pool to the async_request in the hopes I can code something that tells the worker to make a future, send it to the pool and asynchronously (freeing the worker) waits for the result.

The reason I want to do that is to make the application scalable. Is it an unnecessary step? Should I simply have a thread per url and that is it? Something like:

LEN = len(list_of_urls)
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN)

is good enough?

like image 770
zeh Avatar asked Sep 06 '17 12:09

zeh


3 Answers

You have to create and set a new event loop in the thread context in order to run coroutines:

import asyncio from concurrent.futures import ThreadPoolExecutor   def run(corofn, *args):     loop = asyncio.new_event_loop()     try:         coro = corofn(*args)         asyncio.set_event_loop(loop)         return loop.run_until_complete(coro)     finally:         loop.close()   async def main():     loop = asyncio.get_event_loop()     executor = ThreadPoolExecutor(max_workers=5)     futures = [         loop.run_in_executor(executor, run, asyncio.sleep, 1, x)         for x in range(10)]     print(await asyncio.gather(*futures))     # Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]   if __name__ == '__main__':     loop = asyncio.get_event_loop()     loop.run_until_complete(main()) 
like image 135
Vincent Avatar answered Oct 23 '22 14:10

Vincent


From what I understood from the question, you are trying to use each thread to:

  • trigger a coroutine execution
  • be free to receive more coroutines to trigger
  • wait everything to end in an asynchronous way

However, as soon as you call the loop (be it the main or a new loop) to wait for results, it blocks the thread waiting.

And, by using run_in_executor with a bunch of sync functions, the thread doesn't actually know if there are more coroutines to dispatch in one go before reaching the point where it waits the loop.

I think that if you want to dispatch a bunch of coroutines in such a way as to each thread manage its own group of coroutines in its own event loop, the following code achieved the 1 second total time, multithreaded wait for 10 async sleeps of 1 second.

import asyncio import threading from asyncio import AbstractEventLoop from concurrent.futures import ThreadPoolExecutor from time import perf_counter from typing import Dict, Set  import _asyncio  event_loops_for_each_thread: Dict[int, AbstractEventLoop] = {}   def run(corofn, *args):     curr_thread_id = threading.current_thread().ident      if curr_thread_id not in event_loops_for_each_thread:         event_loops_for_each_thread[curr_thread_id] = asyncio.new_event_loop()      thread_loop = event_loops_for_each_thread[curr_thread_id]     coro = corofn(*args)     return thread_loop.create_task(coro)   async def async_gather_tasks(all_tasks: Set[_asyncio.Task]):     return await asyncio.gather(*all_tasks)   def wait_loops():     # each thread will block waiting all async calls of its specific async loop     curr_thread_id = threading.current_thread().ident     threads_event_loop = event_loops_for_each_thread[curr_thread_id]          # I print the following to prove that each thread is waiting its loop     print(f'Thread {curr_thread_id} will wait its tasks.')     return threads_event_loop.run_until_complete(async_gather_tasks(asyncio.all_tasks(threads_event_loop)))   async def main():     loop = asyncio.get_event_loop()     max_workers = 5     executor = ThreadPoolExecutor(max_workers=max_workers)      # dispatching async tasks for each thread.     futures = [         loop.run_in_executor(executor, run, asyncio.sleep, 1, x)         for x in range(10)]      # waiting the threads finish dispatching the async executions to its own event loops     await asyncio.wait(futures)      # at this point the async events were dispatched to each thread event loop      # in the lines below, you tell each worker thread to wait all its async tasks completion.     futures = [         loop.run_in_executor(executor, wait_loops)         for _ in range(max_workers)     ]          print(await asyncio.gather(*futures))     # it will print something like:     # [[1, 8], [0], [6, 3, 9, 7], [4], [2, 5]]     # each sub-set is the result of the tasks of a thread     # it is non-deterministic, so it will return a diferent array of arrays each time you run.   if __name__ == '__main__':     loop = asyncio.get_event_loop()     start = perf_counter()     loop.run_until_complete(main())     end = perf_counter()     duration_s = end - start     # the print below proves that all threads are waiting its tasks asynchronously     print(f'duration_s={duration_s:.3f}') 
like image 25
Tonsic Avatar answered Oct 23 '22 13:10

Tonsic


I just wanted to write a similar answer to Tonsic's answer on how asyncio should actually be used in this situation, but much more succinctly (using some newer asyncio features as well).

What you're really looking for in this case asyncio.gather, which let's you run many coroutines concurrently.

From your example, it should thus become:

async def async_request():
    await asyncio.sleep(3)

async def main():
    await asyncio.gather(*[async_request() for _ in range(10)])

Now when we time it, it takes about 3 seconds, as desired, instead of 30 seconds:

>>> from time import time
>>> start = time()
>>> asyncio.run(main())
>>> time() - start
3.00907039642334

Furthermore, on using concurrent.futures alongside asyncio, you should identify what blocking code needs an executor and only apply it there to turn it into asynchronous code.

async def async_request():
    # The default executor is a `ThreadPoolExecutor`.
    # In python >= 3.9, this can be shortened to `asyncio.to_thread(sync_request)`.
    await asyncio.get_running_loop().run_in_executor(None, sync_request)

From that point, you can then manage your executors by treating these as coroutines with asyncio, using things like asyncio.gather, as originally shown.

like image 24
Simply Beautiful Art Avatar answered Oct 23 '22 13:10

Simply Beautiful Art