Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancel nested coroutines in python asyncio

In my application, I have a coroutine, which may await on several other coroutines, and each if this coroutines, may await on another ones and so on. If one of such coroutines fails, there is no need to execute all others, that was not executed yet. (In my case this is even harmful, and I want to launch several rollback coroutines instead). So, how do I cancel execution of all nested coroutines? Here is what I have for now:

import asyncio

async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo2 done')

async def bar():
    await asyncio.gather(bar1(), bar2())


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)

async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        task_foo.cancel()
        task_bar.cancel()
    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

This obviously does not work. As you can see, foo coroutine is canceled, as I want, but bar2 is still running:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

So, I'm definitely doing something wrong. What is the correct approach here?

like image 520
user6743038 Avatar asked Oct 29 '25 19:10

user6743038


1 Answers

By the time you call task_bar.cancel() the task is already finished, so there's no effect. As the gather docs state:

If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

This is exactly what's happening, a slight modification of your task_bar coroutine to:

async def bar():
    try:
        await asyncio.gather(bar1(), bar2())
    except Exception:
        print("Got a generic exception on bar")
        raise

Outputs:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
Got a generic exception on bar
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Bar2 3
Baz 1
Bar2 4
Baz 2
Bar2 done
Baz 3
Baz 4

I'm also printing task_bar before the task_bar.cancel() call, notice it's finished, so calling cancel has no effect.

In terms of a solution I think the spawning coroutine needs to handle the cancellation of the coroutines it scheduled as I couldn't find a way to retrieve them once the coroutine is finished (beyond abusing Task.all_tasks which sounds wrong).

Having said that I had to use wait instead of gather and return on first exception, here's a full example:

import asyncio


async def foo():
    for i in range(5):
        print('Foo', i)
        await asyncio.sleep(0.5)
    print('Foo done')


async def bar():
    done, pending = await asyncio.wait(
        [bar1(), bar2()], return_when=asyncio.FIRST_EXCEPTION)

    for task in pending:
        task.cancel()

    for task in done:
        task.result()  # needed to raise the exception if it happened


async def bar1():
    await asyncio.sleep(1)
    raise Exception('Boom!')


async def bar2():
    for i in range(5):
        print('Bar2', i)
        await asyncio.sleep(0.5)
    print('Bar2 done')


async def baz():
    for i in range(5):
        print('Baz', i)
        await asyncio.sleep(0.5)


async def main():
    task_foo = asyncio.Task(foo())
    task_bar = asyncio.Task(bar())
    try:
        await asyncio.gather(task_foo, task_bar)
    except Exception:
        print('One task failed. Canceling all')
        print(task_bar)
        task_foo.cancel()
        task_bar.cancel()

    print('Now we want baz')
    await baz()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

Which outputs:

Foo 0
Bar2 0
Foo 1
Bar2 1
Foo 2
Bar2 2
One task failed. Canceling all
<Task finished coro=<bar() done, defined at cancel_nested_coroutines_2.py:11> exception=Exception('Boom!',)>
Now we want baz
Baz 0
Baz 1
Baz 2
Baz 3
Baz 4

It's not great, but it works.

like image 82
Yeray Diaz Avatar answered Oct 31 '25 10:10

Yeray Diaz



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!