I'm trying to use asyncio to handle concurrent network I/O. A very large number of functions are to be scheduled at a single point which vary greatly in time it takes for each to complete. Received data is then processed in a separate process for each output.
The order in which the data is processed is not relevant, so given the potentially very long waiting period for output I'd like to await
for whatever future finishes first instead of a predefined order.
def fetch(x):
sleep()
async def main():
futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
for f in futures:
await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Normally, awaiting in order in which futures were queued is fine:
Blue color represents time each task is in executor's queue, i.e. run_in_executor
has been called, but the function was not yet executed, as the executor runs only 5 tasks simultaneously; green is time spent on executing the function itself; and the red is the time spent waiting for all previous futures to await
.
In my case where functions vary in time greatly, there is a lot of time lost on waiting for previous futures in queue to await, while I could be locally processing GET output. This makes my system idle for a while only to get overwhelmed when several outputs complete simultaneously, then jumping back to idle waiting for more requests to finish.
Is there a way to await
whatever future is first completed in the executor?
The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.
ensure_future is a method to create Task from coroutine . It creates tasks in different ways based on argument (including using of create_task for coroutines and future-like objects).
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
Asyncio vs threading: Async runs one block of code at a time while threading just one line of code at a time. With async, we have better control of when the execution is given to other block of code but we have to release the execution ourselves.
Looks like you are looking for asyncio.wait with return_when=asyncio.FIRST_COMPLETED
.
def fetch(x):
sleep()
async def main():
futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
while futures:
done, futures = await asyncio.wait(futures,
loop=loop, return_when=asyncio.FIRST_COMPLETED)
for f in done:
await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With