This is on Linux, Python 3.5.1.
I'm developing a monitor process with asyncio
, whose tasks at various places await
on asyncio.sleep
calls of various durations.
There are points in time when I would like to be able to interrupt all said asyncio.sleep
calls and let all tasks proceed normally, but I can't find how to do that. An example is for graceful shutdown of the monitor process.
I thought that I could send an ALRM signal to that effect, but the process dies. I tried catching the ALRM signal with:
def sigalrm_sent(signum, frame):
tse.logger.info("got SIGALRM")
signal.signal(signal.SIGALRM, sigalrm_sent)
Then I get the log line about catching SIGALRM, but the asyncio.sleep
calls are not interrupted.
At this point, I replaced all asyncio.sleep
calls with calls to this coroutine:
async def interruptible_sleep(seconds):
while seconds > 0 and not tse.stop_requested:
duration = min(seconds, tse.TIME_QUANTUM)
await asyncio.sleep(duration)
seconds -= duration
So I only have to pick a TIME_QUANTUM
that is not too small and not too large either.
Is there a way to interrupt all running asyncio.sleep
calls and I am missing it?
Interrupting all running calls of asyncio.sleep
seems a bit dangerous since it can be used in other parts of the code, for other purposes. Instead I would make a dedicated sleep
coroutine that keeps track of its running calls. It is then possible to interrupt them all by canceling the corresponding tasks:
def make_sleep():
async def sleep(delay, result=None, *, loop=None):
coro = asyncio.sleep(delay, result=result, loop=loop)
task = asyncio.ensure_future(coro)
sleep.tasks.add(task)
try:
return await task
except asyncio.CancelledError:
return result
finally:
sleep.tasks.remove(task)
sleep.tasks = set()
sleep.cancel_all = lambda: sum(task.cancel() for task in sleep.tasks)
return sleep
Example:
async def main(sleep, loop):
for i in range(10):
loop.create_task(sleep(i))
await sleep(3)
nb_cancelled = sleep.cancel_all()
await asyncio.wait(sleep.tasks)
return nb_cancelled
sleep = make_sleep()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(sleep, loop))
print(result) # Print '6'
For debugging purposes, loop.time = lambda: float('inf')
also works.
Based on Vincent's answer, I used the following class (every instance of the class can cancel all its running .sleep
tasks, allowing better compartmentalization):
class Sleeper:
"Group sleep calls allowing instant cancellation of all"
def __init__(self, loop):
self.loop = loop
self.tasks = set()
async def sleep(self, delay, result=None):
coro = aio.sleep(delay, result=result, loop=self.loop)
task = aio.ensure_future(coro)
self.tasks.add(task)
try:
return await task
except aio.CancelledError:
return result
finally:
self.tasks.remove(task)
def cancel_all_helper(self):
"Cancel all pending sleep tasks"
cancelled = set()
for task in self.tasks:
if task.cancel():
cancelled.add(task)
return cancelled
async def cancel_all(self):
"Coroutine cancelling tasks"
cancelled = self.cancel_all_helper()
await aio.wait(self.tasks)
self.tasks -= cancelled
return len(cancelled)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With