I'm trying to figure out if it's possible throw a custom exception into a running asyncio task, similarly to what is achieved by Task.cancel(self)
which schedules a CancelledError
to be raised in the underlying coroutine.
I came across Task.get_coro().throw(exc)
, but calling it seems like opening a big can of worms as we may leave the task in a bad state. Especially considering all the machinery that happens when a task is throwing CancelledError
into its coroutine.
Consider the following example:
import asyncio
class Reset(Exception):
pass
async def infinite():
while True:
try:
print('work')
await asyncio.sleep(1)
print('more work')
except Reset:
print('reset')
continue
except asyncio.CancelledError:
print('cancel')
break
async def main():
infinite_task = asyncio.create_task(infinite())
await asyncio.sleep(0) # Allow infinite_task to enter its work loop.
infinite_task.get_coro().throw(Reset())
await infinite_task
asyncio.run(main())
## OUTPUT ##
# "work"
# "reset"
# "work"
# hangs forever ... bad :(
Is what I try to do even feasible? It feels as if I shouldn't be manipulating the underlying coroutine like this. Any workaround?
There's no way to throw a custom exception into a running task. You shouldn't mess with .throw
- it's a detail of implementation and altering it will probably break something.
If you want to pass information (about reset) into the task, do it trough an argument. Here's how it can be implemented:
import asyncio
from contextlib import suppress
async def infinite(need_reset):
try:
while True:
inner_task = asyncio.create_task(inner_job())
await asyncio.wait(
[
need_reset.wait(),
inner_task
],
return_when=asyncio.FIRST_COMPLETED
)
if need_reset.is_set():
print('reset')
await cancel(inner_task)
need_reset.clear()
except asyncio.CancelledError:
print('cancel')
raise # you should never suppress, see:
# https://stackoverflow.com/a/33578893/1113207
async def inner_job():
print('work')
await asyncio.sleep(1)
print('more work')
async def cancel(task):
# more info: https://stackoverflow.com/a/43810272/1113207
task.cancel()
with suppress(asyncio.CancelledError):
await task
async def main():
need_reset = asyncio.Event()
infinite_task = asyncio.create_task(infinite(need_reset))
await asyncio.sleep(1.5)
need_reset.set()
await asyncio.sleep(1.5)
await cancel(infinite_task)
asyncio.run(main())
Output:
work
more work
work
reset
work
more work
work
cancel
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With