Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

await for any future asyncio

I'm trying to use asyncio to handle concurrent network I/O. A very large number of functions are to be scheduled at a single point which vary greatly in time it takes for each to complete. Received data is then processed in a separate process for each output.

The order in which the data is processed is not relevant, so given the potentially very long waiting period for output I'd like to await for whatever future finishes first instead of a predefined order.

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    for f in futures:
       await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Normally, awaiting in order in which futures were queued is fine:

Well behaved functions profiler graph

Blue color represents time each task is in executor's queue, i.e. run_in_executor has been called, but the function was not yet executed, as the executor runs only 5 tasks simultaneously; green is time spent on executing the function itself; and the red is the time spent waiting for all previous futures to await.

Volatile functions profiler graph

In my case where functions vary in time greatly, there is a lot of time lost on waiting for previous futures in queue to await, while I could be locally processing GET output. This makes my system idle for a while only to get overwhelmed when several outputs complete simultaneously, then jumping back to idle waiting for more requests to finish.

Is there a way to await whatever future is first completed in the executor?

like image 642
Mirac7 Avatar asked Sep 08 '16 22:09

Mirac7


People also ask

What does await do in Asyncio?

The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.

What is Asyncio ensure Future?

ensure_future is a method to create Task from coroutine . It creates tasks in different ways based on argument (including using of create_task for coroutines and future-like objects).

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

Why is Asyncio better than threads?

Asyncio vs threading: Async runs one block of code at a time while threading just one line of code at a time. With async, we have better control of when the execution is given to other block of code but we have to release the execution ourselves.


1 Answers

Looks like you are looking for asyncio.wait with return_when=asyncio.FIRST_COMPLETED.

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    while futures:
        done, futures = await asyncio.wait(futures, 
            loop=loop, return_when=asyncio.FIRST_COMPLETED)  
        for f in done:
            await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
like image 150
Andrew Svetlov Avatar answered Oct 05 '22 21:10

Andrew Svetlov