Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling tasks that never terminate in python3 asyncio

Sometimes async tasks have no meaningful termination condition - for instance, in the program below, the "rate_limiter" task generates a stream of tokens on a queue, at a fixed rate, forever.

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)

main()

This program works perfectly except that the asyncio library considers it a programming error to just throw away rltask when there's nothing left to rate-limit; you get a complaint like

...
job 18
job 19
Task was destroyed but it is pending!
task: <Task pending coro=<rate_limiter() running at rl.py:9>
      wait_for=<Future pending cb=[Task._wakeup()]>>

(whether or not in debug mode).

I can kludge around this with, like, an Event that tells the rate_limiter coroutine to break out of its loop, but that feels like extra code to no real benefit. How are you supposed to deal with this sort of situation when using asyncio?

EDIT: I was unclear: what I am looking for is something like the daemon flag on threads: something that makes it so I don't have to wait for a particular task, ideally expressed as an annotation on the task itself, or its coroutine. I would also accept an answer that demonstrates that there is no such mechanism. I'm already aware of workarounds.

like image 890
zwol Avatar asked Oct 18 '22 22:10

zwol


2 Answers

To avoid "Task was destroyed but it is pending!" warning, you could mark a never ending coroutine as finished on exit from the program if you set a dummy result for the corresponding future object:

#!/usr/bin/env python3.5
import asyncio
import itertools
from contextlib import closing, contextmanager


@contextmanager
def finishing(coro_or_future, *, loop=None):
    """Mark a never ending coroutine or future as done on __exit__."""
    fut = asyncio.ensure_future(
        coro_or_future, loop=loop)  # start infinite loop
    try:
        yield
    finally:
        if not fut.cancelled():
            fut.set_result(None)  # mark as finished


async def never_ends():
    for c in itertools.cycle('\|/-'):
        print(c, end='\r', flush=True)
        await asyncio.sleep(.3)


with closing(asyncio.get_event_loop()) as loop, \
     finishing(never_ends(), loop=loop):
    loop.run_until_complete(asyncio.sleep(3))  # do something else

It assumes that your coroutine does not require an explicit cleanup before the process exits. In the latter case, define an explicit procedure for the cleanup: provide methods that could be called (e.g., server.close(), server.wait_closed()), or pass an event (asyncio.Event) that a caller should set on shutdown, or raise an exception (such as CancelledError).

The benefit of introducing finishing() is to detect bugs i.e., you should not ignore the warning unless it is explicitly silenced by the finishing() call.

like image 80
jfs Avatar answered Oct 22 '22 22:10

jfs


.cancel() the task and then wait for it be cancelled, catching the CancelledError outside:

# vim: tabstop=4 expandtab

import asyncio
import sys

@asyncio.coroutine
def rate_limiter(queue, rate):
    """Push tokens to QUEUE at a rate of RATE per second."""
    delay = 1/rate
    while True:
        yield from asyncio.sleep(delay)
        yield from queue.put(None)

@asyncio.coroutine
def do_work(n, rate):
    for i in range(n):
        yield from rate.get()
        sys.stdout.write("job {}\n".format(i))

def main():
    loop   = asyncio.get_event_loop()
    rate   = asyncio.Queue()
    rltask = loop.create_task(rate_limiter(rate, 10))
    wtask  = loop.create_task(do_work(20, rate))
    loop.run_until_complete(wtask)
    rltask.cancel()
    try:
        loop.run_until_complete(rltask)
    except asyncio.CancelledError:
        ...
    loop.close()

main()
like image 44
Jashandeep Sohi Avatar answered Oct 22 '22 22:10

Jashandeep Sohi