Using Python's asyncio
module, how do I select the first result from multiple coroutines?
I might want to implement a timeout on waiting on a queue:
result = yield from select(asyncio.sleep(1),
queue.get())
This would be similar to Go's select
or Clojure's core.async.alt!
. It is something like the converse of asyncio.gather
(gather is like all
, select would be like any
.)
Simple solution, by using asyncio.wait
and its FIRST_COMPLETED
parameter:
import asyncio
async def something_to_wait():
await asyncio.sleep(1)
return "something_to_wait"
async def something_else_to_wait():
await asyncio.sleep(2)
return "something_else_to_wait"
async def wait_first():
done, pending = await asyncio.wait(
[something_to_wait(), something_else_to_wait()],
return_when=asyncio.FIRST_COMPLETED)
print("done", done)
print("pending", pending)
asyncio.get_event_loop().run_until_complete(wait_first())
gives:
done {<Task finished coro=<something_to_wait() done, defined at stack.py:3> result='something_to_wait'>}
pending {<Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending coro=<something_else_to_wait() running at stack.py:8> wait_for=<Future pending cb=[Task._wakeup()]>>
You can implement this using both asyncio.wait
and asyncio.as_completed
:
import asyncio
async def ok():
await asyncio.sleep(1)
return 5
async def select1(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
return (await next(asyncio.as_completed(futures)))
async def select2(*futures, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
done, running = await asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
result = done.pop()
return result.result()
async def example():
queue = asyncio.Queue()
result = await select1(ok(), queue.get())
print('got {}'.format(result))
result = await select2(queue.get(), ok())
print('got {}'.format(result))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(example())
Output:
got 5
got 5
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:463]>
Task was destroyed but it is pending!
task: <Task pending coro=<get() done, defined at /usr/lib/python3.4/asyncio/queues.py:170> wait_for=<Future pending cb=[Task._wakeup()]>>
Both implementations return the value yielded by the first completed Future
, but you can easily tweak it to return the Future
itself, instead. Note that because the other Future
passed to each select
implementation is never yielded from, a warning gets raised when the process exits.
In the case of wanting to apply a timeout to a task, there is a standard library function that does exactly this: asyncio.wait_for()
. Your example can be written like this:
try:
result = await asyncio.wait_for(queue.get(), timeout=1)
except asyncio.TimeoutError:
# This block will execute if queue.get() takes more than 1s.
result = ...
But this only works for the specific case of a timeout. The other two answers here generalize to any arbitrary set of tasks, but neither of those answers shows how to clean up the tasks which don't finish first. This is what causes the "Task was destroyed but it is pending" messages in the output. In practice, you should do something with those pending tasks. Based on your example, I'll assume that you don't care about the other tasks' results. Here's an example of a wait_first()
function that returns the value of the first completed task and cancels the remaining tasks.
import asyncio, random
async def foo(x):
r = random.random()
print('foo({:d}) sleeping for {:0.3f}'.format(x, r))
await asyncio.sleep(r)
print('foo({:d}) done'.format(x))
return x
async def wait_first(*futures):
''' Return the result of the first future to finish. Cancel the remaining
futures. '''
done, pending = await asyncio.wait(futures,
return_when=asyncio.FIRST_COMPLETED)
gather = asyncio.gather(*pending)
gather.cancel()
try:
await gather
except asyncio.CancelledError:
pass
return done.pop().result()
async def main():
result = await wait_first(foo(1), foo(2))
print('the result is {}'.format(result))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Running this example:
# export PYTHONASYNCIODEBUG=1
# python3 test.py
foo(1) sleeping for 0.381
foo(2) sleeping for 0.279
foo(2) done
the result is 2
# python3 test.py
foo(1) sleeping for 0.048
foo(2) sleeping for 0.515
foo(1) done
the result is 1
# python3 test.py
foo(1) sleeping for 0.396
foo(2) sleeping for 0.188
foo(2) done
the result is 2
There are no error messages about pending tasks, because each pending task has been cleaned up correctly.
In practice, you probably want wait_first()
to return the future, not the future's result, otherwise it will be really confusing trying to figure out which future finished. But in the example here, I returned the future's result since it looks a little cleaner.
Here's a more robust solution based upon earlier examples that deals with the following:
Note this example requires Python 3.8 due to use of the assignment operator.
async def wait_first(*tasks):
"""Return the result of first async task to complete with a non-null result"""
# Get first completed task(s)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Tasks MAY complete at same time e.g. in unit tests :)
# Coalesce the first result if present
for task in done:
exception = task.exception()
if exception is None and (result := task.result()):
break
else:
result = None
# Gather remaining tasks without raising exceptions
gather = asyncio.gather(*pending, return_exceptions=True)
# Cancel remaining tasks if result is non-null otherwise await next pending tasks
if result:
gather.cancel()
elif pending:
result = await wait_first(*pending)
# Await remaining tasks to ensure they are cancelled
try:
await gather
except asyncio.CancelledError:
pass
# Return result or raise last exception if no result was returned
if exception and result is None:
raise exception
else:
return result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With