Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Interrupt all asyncio.sleep currently executing

where

This is on Linux, Python 3.5.1.

what

I'm developing a monitor process with asyncio, whose tasks at various places await on asyncio.sleep calls of various durations.

There are points in time when I would like to be able to interrupt all said asyncio.sleep calls and let all tasks proceed normally, but I can't find how to do that. An example is for graceful shutdown of the monitor process.

how (failed assumption)

I thought that I could send an ALRM signal to that effect, but the process dies. I tried catching the ALRM signal with:

def sigalrm_sent(signum, frame):
    tse.logger.info("got SIGALRM")

signal.signal(signal.SIGALRM, sigalrm_sent)

Then I get the log line about catching SIGALRM, but the asyncio.sleep calls are not interrupted.

how (kludge)

At this point, I replaced all asyncio.sleep calls with calls to this coroutine:

async def interruptible_sleep(seconds):
    while seconds > 0 and not tse.stop_requested:
        duration = min(seconds, tse.TIME_QUANTUM)
        await asyncio.sleep(duration)
        seconds -= duration

So I only have to pick a TIME_QUANTUM that is not too small and not too large either.

but

Is there a way to interrupt all running asyncio.sleep calls and I am missing it?

like image 242
tzot Avatar asked May 13 '16 12:05

tzot


2 Answers

Interrupting all running calls of asyncio.sleep seems a bit dangerous since it can be used in other parts of the code, for other purposes. Instead I would make a dedicated sleep coroutine that keeps track of its running calls. It is then possible to interrupt them all by canceling the corresponding tasks:

def make_sleep():
    async def sleep(delay, result=None, *, loop=None):
        coro = asyncio.sleep(delay, result=result, loop=loop)
        task = asyncio.ensure_future(coro)
        sleep.tasks.add(task)
        try:
            return await task
        except asyncio.CancelledError:
            return result
        finally:
            sleep.tasks.remove(task)

    sleep.tasks = set()
    sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
    return sleep

Example:

async def main(sleep, loop):
    for i in range(10):
        loop.create_task(sleep(i))
    await sleep(3)
    nb_cancelled = sleep.cancel_all()
    await asyncio.wait(sleep.tasks)
    return nb_cancelled

sleep = make_sleep()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(sleep, loop)) 
print(result)  # Print '6'

For debugging purposes, loop.time = lambda: float('inf') also works.

like image 174
Vincent Avatar answered Sep 27 '22 21:09

Vincent


Based on Vincent's answer, I used the following class (every instance of the class can cancel all its running .sleep tasks, allowing better compartmentalization):

class Sleeper:
    "Group sleep calls allowing instant cancellation of all"

    def __init__(self, loop):
        self.loop = loop
        self.tasks = set()

    async def sleep(self, delay, result=None):
        coro = aio.sleep(delay, result=result, loop=self.loop)
        task = aio.ensure_future(coro)
        self.tasks.add(task)
        try:
            return await task
        except aio.CancelledError:
            return result
        finally:
            self.tasks.remove(task)

    def cancel_all_helper(self):
        "Cancel all pending sleep tasks"
        cancelled = set()
        for task in self.tasks:
            if task.cancel():
                cancelled.add(task)
        return cancelled

    async def cancel_all(self):
        "Coroutine cancelling tasks"
        cancelled = self.cancel_all_helper()
        await aio.wait(self.tasks)
        self.tasks -= cancelled
        return len(cancelled)
like image 41
tzot Avatar answered Sep 27 '22 23:09

tzot