Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to correctly handle cancelled tasks in Python's `asyncio.gather`

So I'm taking another stab at the asyncio module now that 3.8 is out. However, I am getting unexpected results when trying to do a graceful shutdown of the event loop. Specifically I am listening for a SIGINT, cancelling the running Tasks, gathering those Tasks, and then .stop()ing the event loop. I know that Tasks raise a CancelledError when they are cancelled which will propagate up and end my call to asyncio.gather unless, according to the documentation, I pass return_exceptions=True to asyncio.gather, which should cause gather to wait for all the Tasks to cancel and return an array of CancelledErrors. However, it appears that return_exceptions=True still results in an immediate interruption of my gather call if I try to gather cancelled Tasks.

Here is the code to reproduce the effect. I am running python 3.8.0:

# demo.py

import asyncio
import random
import signal


async def worker():
    sleep_time = random.random() * 3
    await asyncio.sleep(sleep_time)
    print(f"Slept for {sleep_time} seconds")

async def dispatcher(queue):
    while True:
        await queue.get()
        asyncio.create_task(worker())
        tasks = asyncio.all_tasks()
        print(f"Running Tasks: {len(tasks)}")

async def shutdown(loop):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    print(f"Cancelling {len(tasks)} outstanding tasks")
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(f"results: {results}")
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
    queue = asyncio.Queue()
    asyncio.create_task(dispatcher(queue))

    while True:
        await queue.put('tick')
        await asyncio.sleep(1)


asyncio.run(main())

Output:

>> python demo.py 
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
  File "demo.py", line 38, in <module>
    asyncio.run(main())
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

I'm guessing there is still something about the event loop that I don't understand, but I would expect all the CancelledErrors to come back as an array of objects stored in results and then be able to continue on rather than to see an error immediately.

like image 546
Max Taggart Avatar asked Jan 08 '20 23:01

Max Taggart


People also ask

How do I cancel Asyncio tasks?

A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.

How do I cancel a Gather?

Cancel and/or Rebook Active Subscription To manage your subscriptions, open your Space dashboard. In Reservations, select the Active & Upcoming to find the active subscription you wish to update. Select Manage on the subscription to be modified.

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.


1 Answers

What causes the error?

Problem with using asyncio.all_tasks() is that it returns ALL tasks even those you didn't create directly. Change your code following way to see what you cancel:

for task in tasks:
    print(task)
    task.cancel()

You'll see not only worker related tasks, but also:

<Task pending coro=<main() running at ...>

Cancelling main leads to mess inside asyncio.run(main()) and you get error. Let's do fast/dirty modification to exclude this task from cancellation:

tasks = [
    t 
    for t 
    in asyncio.all_tasks() 
    if (
        t is not asyncio.current_task()
        and t._coro.__name__ != 'main'
    )
]

for task in tasks:
    print(task)
    task.cancel()

Now you'll see your results.

loop.stop() leads to error

While you achieved results, you will get another error Event loop stopped before Future completed. It happens because asyncio.run(main()) want to run until main() finished.

You have to restructure your code to allow coroutine you passed into asyncio.run be done instead of stopping event loop or, for example, use loop.run_forever() instead of asyncio.run.

Here's fast/dirty demonstration of what I mean:

async def shutdown(loop):
    # ...

    global _stopping
    _stopping = True
    # loop.stop()

_stopping = False

async def main():
    # ...

    while not _stopping:
        await queue.put('tick')
        await asyncio.sleep(1)

Now your code will work without errors. Don't use code above on practice, it's just an example. Try to restructure your code as I mentioned above.

How to correctly handle tasks

Don't use asyncio.all_tasks().

If you create some task you want to cancel in the future, store it and cancel only stored tasks. Pseudo code:

i_created = []

# ...

task = asyncio.create_task(worker())
i_created.append(task)

# ...

for task in i_created:
    task.cancel()

It may not seem convenient, but it's a way to make sure you don't cancel something you don't want to be cancelled.

One more thing

Note also that that asyncio.run() does much more than just starting event loop. In particular, it cancels all hanging tasks before finishing. It may be useful in some cases, although I advise to handle all cancellations manually instead.

like image 80
Mikhail Gerasimov Avatar answered Nov 14 '22 21:11

Mikhail Gerasimov