Sometimes async tasks have no meaningful termination condition - for instance, in the program below, the "rate_limiter" task generates a stream of tokens on a queue, at a fixed rate, forever.
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
main()
This program works perfectly except that the asyncio library considers it a programming error to just throw away rltask
when there's nothing left to rate-limit; you get a complaint like
...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
wait_for=<Future pending cb=[Task._wakeup()]>>
(whether or not in debug mode).
I can kludge around this with, like, an Event that tells the rate_limiter
coroutine to break out of its loop, but that feels like extra code to no real benefit. How are you supposed to deal with this sort of situation when using asyncio?
EDIT: I was unclear: what I am looking for is something like the daemon
flag on threads: something that makes it so I don't have to wait for a particular task, ideally expressed as an annotation on the task itself, or its coroutine. I would also accept an answer that demonstrates that there is no such mechanism. I'm already aware of workarounds.
To avoid "Task was destroyed but it is pending!" warning, you could mark a never ending coroutine as finished on exit from the program if you set a dummy result for the corresponding future object:
#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager
@contextmanager
def finishing(coro_or_future, *, loop=None):
"""Mark a never ending coroutine or future as done on __exit__."""
fut = asyncio.ensure_future(
coro_or_future, loop=loop) # start infinite loop
try:
yield
finally:
if not fut.cancelled():
fut.set_result(None) # mark as finished
async def never_ends():
for c in itertools.cycle('\|/-'):
print(c, end='\r', flush=True)
await asyncio.sleep(.3)
with closing(asyncio.get_event_loop()) as loop, \
finishing(never_ends(), loop=loop):
loop.run_until_complete(asyncio.sleep(3)) # do something else
It assumes that your coroutine does not require an explicit cleanup before the process exits. In the latter case, define an explicit procedure for the cleanup: provide methods that could be called (e.g., server.close()
, server.wait_closed()
), or pass an event (asyncio.Event
) that a caller should set on shutdown, or raise an exception (such as CancelledError
).
The benefit of introducing finishing()
is to detect bugs i.e., you should not ignore the warning unless it is explicitly silenced by the finishing()
call.
.cancel()
the task and then wait for it be cancelled, catching the CancelledError
outside:
# vim: tabstop=4 expandtab
import asyncio
import sys
@asyncio.coroutine
def rate_limiter(queue, rate):
"""Push tokens to QUEUE at a rate of RATE per second."""
delay = 1/rate
while True:
yield from asyncio.sleep(delay)
yield from queue.put(None)
@asyncio.coroutine
def do_work(n, rate):
for i in range(n):
yield from rate.get()
sys.stdout.write("job {}\n".format(i))
def main():
loop = asyncio.get_event_loop()
rate = asyncio.Queue()
rltask = loop.create_task(rate_limiter(rate, 10))
wtask = loop.create_task(do_work(20, rate))
loop.run_until_complete(wtask)
rltask.cancel()
try:
loop.run_until_complete(rltask)
except asyncio.CancelledError:
...
loop.close()
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With