Reading this answer, I ran across asyncio.tasks.as_completed. I don't understand how that function actually works. It is documented as being a non-async routine that returns futures in the order they complete. It creates a queue associated with the event loop, adds a completion callback to each future, and then attempts to get as many items from the queue as there are futures.
The core of the code is as follows:
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise futures.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
I'd like to understand how this code works. My biggest questions are:
Where does the loop actually run. I don't see any calls to loop.run_until_cobmplete or loop.run_forever. So how does the loop make progress?
The method documentation says that the method returns futures. That you could call it something like
for f in as_completed(futures): result = yield from f
I'm having trouble reconciling that against the return f.result line in _wait_for_one. Is the documented calling convention correct? If so, where does that yield come from?
The asyncio. as_completed method takes a list of co-routines, unlike keyword arguments of asyncio. gather method. The asyncio. as_completed returns iterable co-routines that can be used with the await keyword.
The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.
How many times should Asyncio run () be called? It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
The code you copied is missing a header part, that is quite imporant.
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
for f in as_completed(fs):
result = yield from f # The 'yield from' may raise.
# Use result.
If a timeout is specified, the 'yield from' will raise
TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
timeout_handle = None
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise futures.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
[Where does the loop actually run?]
For semplicity sake, suppose that timeout is set to None.
The as_completed expects an iterable of futures, not coroutines. So this futures are already bound to the loop and scheduled for execution. In other terms those futures are the output of loop.create_task or asyncio.ensure_futures (and this is written nowhere explicitly). So the loop is already "running" them and when they will complete, their future .done() method will return True.
Then the "done" queue is created. Note that The "done" queue is an istance of asyncio.queue, i.e. a queue that implements the blocking method (.get, .put) »using the loop«.
By the line "todo = { ...", each coroutine's future (that is an element of fs) is wrapped in another future »bound to the loop«, and this last future's done_callback is set to call the _on_completion function.
The _on_completion function, will be called when the loop will complete the execution of the coroutine, whose futures was passed in the "fs" set to the as_completed function.
The _on_completion function removes "our future" from the todo set and puts the its result (i.e. the coroutine whose future was in the "fs" set) in the done queue. In other terms, all that the as_completed function does, is attaching these futures with a done_callback so that the result of the original future is moved into the done queue.
Then, for len(fs) == len(todo) times, the as_completed function yields a coroutine that blocks "yield from done.get()", waiting for the _on_completed (or the _on_timeout) function to put a result into the done the done queue.
The "yield from"s, executed by the as_completed caller, will wait for a result to appear in the done queue.
[where does that yield come from?]
It comes from the fact that todo is an asyncio.queue, so you can (asyncio-)block until a value is .put() in the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With