Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to throw a custom exception into a running task

I'm trying to figure out if it's possible throw a custom exception into a running asyncio task, similarly to what is achieved by Task.cancel(self) which schedules a CancelledError to be raised in the underlying coroutine.

I came across Task.get_coro().throw(exc), but calling it seems like opening a big can of worms as we may leave the task in a bad state. Especially considering all the machinery that happens when a task is throwing CancelledError into its coroutine.

Consider the following example:

import asyncio

class Reset(Exception):
    pass

async def infinite():
    while True:
        try:
            print('work')
            await asyncio.sleep(1)
            print('more work')
        except Reset:
            print('reset')
            continue
        except asyncio.CancelledError:
            print('cancel')
            break

async def main():
    infinite_task = asyncio.create_task(infinite())
    await asyncio.sleep(0)  # Allow infinite_task to enter its work loop.
    infinite_task.get_coro().throw(Reset())
    await infinite_task

asyncio.run(main())

## OUTPUT ##
# "work"
# "reset"
# "work"
# hangs forever ... bad :(

Is what I try to do even feasible? It feels as if I shouldn't be manipulating the underlying coroutine like this. Any workaround?

like image 432
Jaanus Varus Avatar asked Oct 27 '22 04:10

Jaanus Varus


1 Answers

There's no way to throw a custom exception into a running task. You shouldn't mess with .throw - it's a detail of implementation and altering it will probably break something.

If you want to pass information (about reset) into the task, do it trough an argument. Here's how it can be implemented:

import asyncio
from contextlib import suppress


async def infinite(need_reset):
    try:
        while True:
            inner_task = asyncio.create_task(inner_job())

            await asyncio.wait(
                [
                    need_reset.wait(),
                    inner_task
                ], 
                return_when=asyncio.FIRST_COMPLETED
            )

            if need_reset.is_set():
                print('reset')
                await cancel(inner_task)
                need_reset.clear()
    except asyncio.CancelledError:
        print('cancel')
        raise  # you should never suppress, see:
               # https://stackoverflow.com/a/33578893/1113207


async def inner_job():
    print('work')
    await asyncio.sleep(1)
    print('more work')


async def cancel(task):
    # more info: https://stackoverflow.com/a/43810272/1113207
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


async def main():
    need_reset = asyncio.Event()
    infinite_task = asyncio.create_task(infinite(need_reset))

    await asyncio.sleep(1.5)
    need_reset.set()

    await asyncio.sleep(1.5)
    await cancel(infinite_task)


asyncio.run(main())

Output:

work
more work
work
reset
work
more work
work
cancel
like image 97
Mikhail Gerasimov Avatar answered Nov 15 '22 05:11

Mikhail Gerasimov