Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run a coroutine and wait it result from a sync func when the loop is running?

I have a code like the foolowing:

def render():
    loop = asyncio.get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    if loop.is_running():
        result = asyncio.ensure_future(test())
    else:
        result = loop.run_until_complete(test())

When the loop is not running is quite easy, just use loop.run_until_complete and it return the coro result but if the loop is already running (my blocking code running in app which is already running the loop) I cannot use loop.run_until_complete since it will raise an exception; when I call asyncio.ensure_future the task gets scheduled and run, but I want to wait there for the result, does anybody knows how to do this? Docs are not very clear how to do this.

I tried passing a concurrent.futures.Future calling set_result inside the coro and then calling Future.result() on my blocking code, but it doesn't work, it blocks there and do not let anything else to run. ANy help would be appreciated.

like image 910
Ordani Sanchez Avatar asked Mar 16 '18 23:03

Ordani Sanchez


People also ask

How do you await async function in Python?

An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.

How do I stop Asyncio from running?

Run an asyncio Event Loop run_until_complete(<some Future object>) – this function runs a given Future object, usually a coroutine defined by the async / await pattern, until it's complete. run_forever() – this function runs the loop forever. stop() – the stop function stops a running loop.

How does the Asyncio event loop work?

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.


3 Answers

Waiting Synchronously for an Asynchronous Coroutine

If an asyncio event loop is already running by calling loop.run_forever, it will block the executing thread until loop.stop is called [see the docs]. Therefore, the only way for a synchronous wait is to run the event loop on a dedicated thread, schedule the asynchronous function on the loop and wait for it synchronously from another thread.

For this I have composed my own minimal solution following the answer by user4815162342. I have also added the parts for cleaning up the loop when all work is finished [see loop.close].

The main function in the code below runs the event loop on a dedicated thread, schedules several tasks on the event loop, plus the task the result of which is to be awaited synchronously. The synchronous wait will block until the desired result is ready. Finally, the loop is closed and cleaned up gracefully along with its thread.

The dedicated thread and the functions stop_loop, run_forever_safe, and await_sync can be encapsulated in a module or a class.

For thread-safery considerations, see section “Concurrency and Multithreading” in asyncio docs.

import asyncio
import threading
#----------------------------------------

def stop_loop(loop):
    ''' stops an event loop '''
    loop.stop()
    print (".: LOOP STOPPED:", loop.is_running())

def run_forever_safe(loop):
    ''' run a loop for ever and clean up after being stopped '''

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    #-- cancell all tasks and close the loop gracefully
    print(".: CLOSING LOOP...")
    # source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>

    loop_tasks_all = asyncio.Task.all_tasks(loop=loop)

    for task in loop_tasks_all: task.cancel()
    # NOTE: `cancel` does not guarantee that the Task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError: pass
    #END for
    print(".: ALL TASKS CANCELLED.")

    loop.close()
    print(".: LOOP CLOSED:", loop.is_closed())

def await_sync(task):
    ''' synchronously waits for a task '''
    while not task.done(): pass
    print(".: AWAITED TASK DONE")
    return task.result()
#----------------------------------------

async def asyncTask(loop, k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    await asyncio.sleep(3, loop=loop)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key

def main():
    loop = asyncio.new_event_loop() # construct a new event loop

    #-- closures for running and stopping the event-loop
    run_loop_forever = lambda: run_forever_safe(loop)
    close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)

    #-- make dedicated thread for running the event loop
    thread = threading.Thread(target=run_loop_forever)

    #-- add some tasks along with my particular task
    myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
    otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
                  for i in range(1, 10)]

    #-- begin the thread to run the event-loop
    print(".: EVENT-LOOP THREAD START")
    thread.start()

    #-- _synchronously_ wait for the result of my task
    result = await_sync(myTask) # blocks until task is done
    print("* final result of my task:", result) 

    #... do lots of work ...
    print("*** ALL WORK DONE ***")
    #========================================

    # close the loop gracefully when everything is finished
    close_loop_safe()
    thread.join()
#----------------------------------------

main()
like image 129
AlQuemist Avatar answered Oct 14 '22 06:10

AlQuemist


To implement runner with the proposed design, you would need a way to single-step the event loop from a callback running inside it. Asyncio explicitly forbids recursive event loops, so this approach is a dead end.

Given that constraint, you have two options:

  1. make render() itself a coroutine;
  2. execute render() (and its callers) in a thread different than the thread that runs the asyncio event loop.

Assuming #1 is out of the question, you can implement the #2 variant of render() like this:

def render():
    loop = _event_loop  # can't call get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    future = asyncio.run_coroutine_threadsafe(test(), loop)
    result = future.result()

Note that you cannot use asyncio.get_event_loop() in render because the event loop is not (and should not be) set for that thread. Instead, the code that spawns the runner thread must call asyncio.get_event_loop() and send it to the thread, or just leave it in a global variable or a shared structure.

like image 27
user4815162342 Avatar answered Oct 14 '22 06:10

user4815162342


here is my case, my whole programe is async, but call some sync lib, then callback to my async func.

follow the answer by user4815162342.

import asyncio

async def asyncTask(k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    # await asyncio.sleep(3, loop=loop)
    await asyncio.sleep(3)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key


def my_callback():
    print("here i want to call my async func!")
    future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
    return future.result()

def sync_third_lib(cb):
    print("here will call back to your code...")
    cb()

async def main():
    print("main start...")

    print("call sync third lib ...")
    await asyncio.to_thread(sync_third_lib, my_callback)
    # await loop.run_in_executor(None, func=sync_third_lib)
    print("another work...keep async...")
    await asyncio.sleep(2)

    print("done!")


LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())
like image 28
Ronaldinho Avatar answered Oct 14 '22 08:10

Ronaldinho