I'm trying 2 ways to stop an infinite loop from running:
While supervisor_2 does not throw any errors at when interrupted, I cannot get supervisor_1 from getting Task was destroyed but it is pending!
. Any idea why ?
Here is the code:
import asyncio
import aioredis
from functools import partial
class Listener:
def __init__(self, redis_conn):
self.redis_conn = redis_conn
async def forever(self, loop_name):
counter = 0
try:
while True:
print('{}: {}'.format(loop_name, counter))
counter += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print('Task Cancelled')
self.redis_conn.close()
await self.redis_conn.wait_closed()
async def supervisor_1(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2')))
await asyncio.sleep(2)
task.cancel()
async def supervisor_2(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
await asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor_2, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
task.cancel()
loop.run_forever()
finally:
loop.close()
@update:
Thanks to @Gerasimov, here is a version that fix the problem, but somehow still raise errors from time to time on KeyboardInterrupt:
async def supervisor(redis_conn):
redis_conn = await redis_conn
l = Listener(redis_conn)
task = asyncio.ensure_future(
asyncio.gather(l.forever('loop_1'),
l.forever('loop_2'))
)
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
async def kill_tasks():
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
with suppress(asyncio.CancelledError):
await task
and
if __name__ == '__main__':
redis_conn = aioredis.create_pool(('localhost', 5003), db=1)
loop = asyncio.get_event_loop()
run = partial(supervisor, redis_conn=redis_conn)
task = asyncio.ensure_future(run())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
print('Interruped !')
loop.run_until_complete(kill_tasks())
finally:
loop.close()
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
The asyncio. gather() returns the results of awaitables as a tuple with the same order as you pass the awaitables to the function.
gather() method - It runs awaitable objects (objects which have await keyword) concurrently.
Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.
task.cancel()
itself doesn't finish the task: it just says to task that CancelledError
should be raised inside it and returns immediately. You should call it and await while task would be actually cancelled (while it'll raise CancelledError
).
You also shouldn't suppress CancelledError
inside task.
Read this answer where I tried to show different ways of working with tasks. For example to cancel some task and await it cancelled you can do:
from contextlib import suppress
task = ... # remember, task doesn't suppress CancelledError itself
task.cancel() # returns immediately, we should await task raised CancelledError.
with suppress(asyncio.CancelledError):
await task # or loop.run_until_complete(task) if it happens after event loop stopped
# Now when we awaited for CancelledError and handled it,
# task is finally over and we can close event loop without warning.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With