I would like to submit jobs from a thread to an asyncio
event loop (just like run_in_executor but the other way around).
Here's what the asyncio
documentation says about concurrency and multithreading:
To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread:
loop.call_soon_threadsafe(asyncio.async, coro_func())
That works fine but the result of the coroutine is lost.
Instead, it is possible to use a function that adds a done callback to the future returned by async
(or ensure_future
) so that the thread can access the result through a concurrent.futures.Future.
Is there a particular reason why such a feature is not implemented in the standard library? Or did I miss a simpler way to achieve that?
The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
new_event_loop() − This method will create and return a new event loop object. loop. run_forever() − This method will run until stop() method is called.
stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.
My request made its way and a run_coroutine_threadsafe function has been implemented here.
Example:
def target(loop, timeout=None):
future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
return future.result(timeout)
async def add(a, b):
await asyncio.sleep(1)
return a + b
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3
I originally posted a sub-class of concurrent.futures.Executor that can still be implemented as:
class LoopExecutor(concurrent.futures.Executor):
"""An Executor subclass that uses an event loop
to execute calls asynchronously."""
def __init__(self, loop=None):
"""Initialize the executor with a given loop."""
self.loop = loop or asyncio.get_event_loop()
def submit(self, fn, *args, **kwargs):
"""Schedule the callable, fn, to be executed as fn(*args **kwargs).
Return a Future object representing the execution of the callable."""
coro = asyncio.coroutine(fn)(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self.loop)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With