So I'm taking another stab at the asyncio
module now that 3.8 is out. However, I am getting unexpected results when trying to do a graceful shutdown of the event loop. Specifically I am listening for a SIGINT
, cancelling the running Task
s, gathering those Task
s, and then .stop()
ing the event loop. I know that Task
s raise a CancelledError
when they are cancelled which will propagate up and end my call to asyncio.gather
unless, according to the documentation, I pass return_exceptions=True
to asyncio.gather
, which should cause gather
to wait for all the Task
s to cancel and return an array of CancelledError
s. However, it appears that return_exceptions=True
still results in an immediate interruption of my gather
call if I try to gather
cancelled Task
s.
Here is the code to reproduce the effect. I am running python 3.8.0:
# demo.py
import asyncio
import random
import signal
async def worker():
sleep_time = random.random() * 3
await asyncio.sleep(sleep_time)
print(f"Slept for {sleep_time} seconds")
async def dispatcher(queue):
while True:
await queue.get()
asyncio.create_task(worker())
tasks = asyncio.all_tasks()
print(f"Running Tasks: {len(tasks)}")
async def shutdown(loop):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
print(f"Cancelling {len(tasks)} outstanding tasks")
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"results: {results}")
loop.stop()
async def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
queue = asyncio.Queue()
asyncio.create_task(dispatcher(queue))
while True:
await queue.put('tick')
await asyncio.sleep(1)
asyncio.run(main())
Output:
>> python demo.py
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
File "demo.py", line 38, in <module>
asyncio.run(main())
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
asyncio.exceptions.CancelledError
I'm guessing there is still something about the event loop that I don't understand, but I would expect all the CancelledError
s to come back as an array of objects stored in results
and then be able to continue on rather than to see an error immediately.
A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.
Cancel and/or Rebook Active Subscription To manage your subscriptions, open your Space dashboard. In Reservations, select the Active & Upcoming to find the active subscription you wish to update. Select Manage on the subscription to be modified.
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
Problem with using asyncio.all_tasks()
is that it returns ALL tasks even those you didn't create directly. Change your code following way to see what you cancel:
for task in tasks:
print(task)
task.cancel()
You'll see not only worker
related tasks, but also:
<Task pending coro=<main() running at ...>
Cancelling main
leads to mess inside asyncio.run(main())
and you get error. Let's do fast/dirty modification to exclude this task from cancellation:
tasks = [
t
for t
in asyncio.all_tasks()
if (
t is not asyncio.current_task()
and t._coro.__name__ != 'main'
)
]
for task in tasks:
print(task)
task.cancel()
Now you'll see your results
.
While you achieved results
, you will get another error Event loop stopped before Future completed
. It happens because asyncio.run(main())
want to run until main()
finished.
You have to restructure your code to allow coroutine you passed into asyncio.run
be done instead of stopping event loop or, for example, use loop.run_forever() instead of asyncio.run
.
Here's fast/dirty demonstration of what I mean:
async def shutdown(loop):
# ...
global _stopping
_stopping = True
# loop.stop()
_stopping = False
async def main():
# ...
while not _stopping:
await queue.put('tick')
await asyncio.sleep(1)
Now your code will work without errors. Don't use code above on practice, it's just an example. Try to restructure your code as I mentioned above.
Don't use asyncio.all_tasks()
.
If you create some task you want to cancel in the future, store it and cancel only stored tasks. Pseudo code:
i_created = []
# ...
task = asyncio.create_task(worker())
i_created.append(task)
# ...
for task in i_created:
task.cancel()
It may not seem convenient, but it's a way to make sure you don't cancel something you don't want to be cancelled.
Note also that that asyncio.run()
does much more than just starting event loop. In particular, it cancels all hanging tasks before finishing. It may be useful in some cases, although I advise to handle all cancellations manually instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With