Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Submit a job to an asyncio event loop

I would like to submit jobs from a thread to an asyncio event loop (just like run_in_executor but the other way around).

Here's what the asyncio documentation says about concurrency and multithreading:

To schedule a callback from a different thread, the BaseEventLoop.call_soon_threadsafe() method should be used. Example to schedule a coroutine from a different thread: loop.call_soon_threadsafe(asyncio.async, coro_func())

That works fine but the result of the coroutine is lost.

Instead, it is possible to use a function that adds a done callback to the future returned by async (or ensure_future) so that the thread can access the result through a concurrent.futures.Future.

Is there a particular reason why such a feature is not implemented in the standard library? Or did I miss a simpler way to achieve that?

like image 642
Vincent Avatar asked Sep 15 '15 13:09

Vincent


People also ask

How does the Asyncio event loop work?

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

Which way among them is used to create an event loop in Python?

new_event_loop() − This method will create and return a new event loop object. loop. run_forever() − This method will run until stop() method is called.

How do I close Asyncio event loop?

stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.


1 Answers

My request made its way and a run_coroutine_threadsafe function has been implemented here.

Example:

def target(loop, timeout=None):
    future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
    return future.result(timeout)

async def add(a, b):
    await asyncio.sleep(1)
    return a + b

loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3

I originally posted a sub-class of concurrent.futures.Executor that can still be implemented as:

class LoopExecutor(concurrent.futures.Executor):
    """An Executor subclass that uses an event loop 
    to execute calls asynchronously."""

    def __init__(self, loop=None):
        """Initialize the executor with a given loop."""
        self.loop = loop or asyncio.get_event_loop()

    def submit(self, fn, *args, **kwargs):
        """Schedule the callable, fn, to be executed as fn(*args **kwargs).
        Return a Future object representing the execution of the callable."""
        coro = asyncio.coroutine(fn)(*args, **kwargs)
        return asyncio.run_coroutine_threadsafe(coro, self.loop)
like image 178
Vincent Avatar answered Oct 05 '22 01:10

Vincent