Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio CancelledError and KeyboardInterrupt

I'm trying 2 ways to stop an infinite loop from running:

  • supervisor_1: task is canceled programatically
  • supervisor_2: task is stopped with Ctrl+C

While supervisor_2 does not throw any errors at when interrupted, I cannot get supervisor_1 from getting Task was destroyed but it is pending!. Any idea why ?

Here is the code:

import asyncio
import aioredis
from functools import partial



class Listener:
    def __init__(self, redis_conn):
        self.redis_conn = redis_conn

    async def forever(self, loop_name):
        counter = 0
        try:
            while True:
                print('{}: {}'.format(loop_name, counter))
                counter += 1
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print('Task Cancelled')
            self.redis_conn.close()
            await self.redis_conn.wait_closed()


async def supervisor_1(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2')))
    await asyncio.sleep(2)
    task.cancel()


async def supervisor_2(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)
    await asyncio.gather(l.forever('loop_1'), 
                         l.forever('loop_2'))


if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor_2, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        task.cancel()
        loop.run_forever()
    finally:
        loop.close()

@update:

Thanks to @Gerasimov, here is a version that fix the problem, but somehow still raise errors from time to time on KeyboardInterrupt:

async def supervisor(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2'))
    )
    await asyncio.sleep(10)
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task

async def kill_tasks():
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task 

and

if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        loop.run_until_complete(kill_tasks())
    finally:
        loop.close()
like image 754
Orelus Avatar asked May 05 '17 12:05

Orelus


People also ask

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

What Asyncio gather returns?

The asyncio. gather() returns the results of awaitables as a tuple with the same order as you pass the awaitables to the function.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.

Is Asyncio a concurrency?

Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.


1 Answers

task.cancel() itself doesn't finish the task: it just says to task that CancelledError should be raised inside it and returns immediately. You should call it and await while task would be actually cancelled (while it'll raise CancelledError).

You also shouldn't suppress CancelledError inside task.

Read this answer where I tried to show different ways of working with tasks. For example to cancel some task and await it cancelled you can do:

from contextlib import suppress


task = ...  # remember, task doesn't suppress CancelledError itself

task.cancel()  # returns immediately, we should await task raised CancelledError.

with suppress(asyncio.CancelledError):
    await task  # or loop.run_until_complete(task) if it happens after event loop stopped

# Now when we awaited for CancelledError and handled it, 
# task is finally over and we can close event loop without warning.
like image 63
Mikhail Gerasimov Avatar answered Oct 20 '22 00:10

Mikhail Gerasimov