I just discovered new features of Python 3.11 like ExceptionGroup and TaskGroup and I'm confused with the following TaskGroup behavior: if one or more tasks inside the group fails then all other normal tasks are cancelled and I have no chance to change that behavior Example:
async def f_error():
raise ValueError()
async def f_normal(arg):
print('starting', arg)
await asyncio.sleep(1)
print('ending', arg)
async with asyncio.TaskGroup() as tg:
tg.create_task(f_normal(1))
tg.create_task(f_normal(2))
tg.create_task(f_error())
# starting 1
# starting 2
#----------
#< traceback of the error here >
In the example above I cannot make "ending 1" and "ending 2" to be printed. Meanwhile it will be very useful to have something like asyncio.gather(return_exceptions=True) option to do not cancel the remaining tasks when an error occurs.
You can say "just do not use TaskGroup if you do not want this cancellation behavior", but the answer is I want to use new exception groups feature and it's strictly bound to TaskGroup
So the questions are:
update
I just released the functionality in this answer in the extraasync Python package. Just do pip install extraasync and from extraasync import ExtraTaskGroup for the class in this answer. (As it makes use of an internal attribute, the package tests themselves explicitly verifies that the approach is still valid - feel free to run the tests with any Python runtime you are to use in a production project)
original answer
As answered by Paul Cornelius, the TaskGroup class is carefully engineered to cancel itself and all its tasks at the moment when any task in it (registered with tg.create_task) raises an exception.
My understanding that a "forgiveful" task group, that would await for all other tasks upon it's context exit (end of async with block), regardless of one or more tasks created in it erroring would still be useful, and that is the functionality you want.
I tinkered around the source code for the TaskGroup, and I think the minimal coding to get the forgiveful task group can be achieved by neutering its internal _abort method. This method is called on task exception handling, and all it does is loop through all tasks not yet done and cancel them. Tasks not cancelled would still be awaited at the end of the with block - and that is what we get by preventing _abort from running.
Keep in mind that as _abort starts with an underscore, it is an implementation detail, and the mechanisms for aborting might change inside TaskGroup even during Py 3.11 lifetime.
For now, I could get it working like this:
import asyncio
class ForgivingTaskGroup(asyncio.TaskGroup):
_abort = lambda self: None
async def f_error():
print("starting error")
raise RuntimeError("booom")
async def f_normal(arg):
print('starting', arg)
await asyncio.sleep(.1)
print('ending', arg)
async def main():
async with ForgivingTaskGroup() as tg:
tg.create_task(f_normal(1))
tg.create_task(f_normal(2))
tg.create_task(f_error())
# await asyncio.sleep(0)
asyncio.run(main())
The stdout I got here is:
starting 1
starting 2
starting error
ending 1
ending 2
And stderr displayed the beautiful ASCII-art tree as by the book, but with a single exception as child.
As other answers have pointed out, TaskGroups don't currently come with any built-in mechanism like asyncio.gather()'s return_exceptions parameter, to prevent the TaskGroup from cancelling all its tasks when one of them raises an exception. A different answer gave a way to edit TaskGroup's internal _abort method to achieve the behavior you want, but if you don't feel comfortable touching Python's internals, you could alternatively rework your coroutines so that they don't propagate their exception until all the other tasks in the group are finished.
A limited and inflexible way to accomplish this would be to use the asyncio.Barrier class, which works like this:
A barrier is a simple synchronization primitive that allows to block until a certain number of tasks are waiting on it. Tasks can wait on the
wait()method and would be blocked until the specified number of tasks end up waiting onwait(). At that point all of the waiting tasks would unblock simultaneously.
So, if you know ahead of time exactly how many tasks n you're going to add to your taskgroup, and as long as you don't explicitly cancel() any individual one of those tasks (only the entire taskgroup as a whole), and as long as you also don't pass your taskgroup into one of its tasks to dynamically add more tasks into it later, you can just create a barrier that blocks until n tasks are waiting on it, and use that barrier to force all of the tasks to return or raise their Exceptions at the same time. If you have n tasks, create the barrier as asyncio.Barrier(n), and ensure that all of your tasks eventually call await barrier.wait() - this will block them until all n of your tasks are waiting at the barrier. As soon as they're all there, the barrier will let them all proceed at once. Manually adding a barrier parameter to every function header and adding the same boilerplate to handle the delayed returns and raises to every coroutine would suck though, so instead we can use a decorator for that purpose:
import asyncio
def block_coro_until_barrier_passed(coro):
"""Ensure that the supplied coroutine doesn't return or raise any error
until the supplied barrier allows it to proceed.
"""
async def decorated_coro( *args,
barrier:asyncio.Barrier,
**kwargs):
runtime_error = None
return_value = None
try:
return_value = await coro(*args, **kwargs)
except Exception as e:
runtime_error = e
finally:
await barrier.wait()
if runtime_error is not None:
raise runtime_error
else:
return return_value
return decorated_coro
@block_coro_until_barrier_passed
async def f_error():
raise ValueError()
@block_coro_until_barrier_passed
async def f_normal(arg):
print('starting', arg)
await asyncio.sleep(1)
print('ending', arg)
return arg
async def main():
async with asyncio.TaskGroup() as tg:
barrier = asyncio.Barrier(3)
tg.create_task(f_normal(1, barrier=barrier))
tg.create_task(f_normal(2, barrier=barrier))
tg.create_task(f_error(barrier=barrier))
if __name__ == '__main__':
asyncio.run(main())
# starting 1
# starting 2
# ending 1
# ending 2
# --------
# traceback for ExceptionGroup
This decorator basically creates a new coroutine that runs the coroutine you decorated, intercepts the return value or Exception that was raised, then either returns that return value or raises that Exception once it's able to pass the barrier (and it will pass the barrier only once all the other tasks have got their exception or return value ready and are now waiting at the barrier). So if you decorate all your coroutines with this decorator and also make sure that you configure the barrier for the correct number of tasks n, then when your taskgroup finally exits, all of the return values will be returned at once, and all of the exceptions raised will be propagated to the final ExceptionGroup (if applicable), and none of your tasks will be cancelled early due to another task raising an exception.
If you need to use this workaround for any real-world problem though, be very careful, as configuring the Barrier with too small of an n will lead to the taskgroup sometimes not allowing all tasks to complete if one raises an Exception, and too large of an n will lead to it hanging indefinitely. And if you cancel any of tasks in the taskgroup, this will cause the taskgroup to hang indefinitely on account of that task never getting to await barrier.wait(), or releasing its wait() if it's already there, meaning there will never be n tasks at the barrier for it to unblock. There may be a workaround to that last bit somewhere in the Barrier class, but I'm not sure.
As a final aside, I have no idea why something that accomplishes this more effectively isn't built into TaskGroup by default, because without some janky workaround like mine, TaskGroups can't fully replace gather(). There's also a bit of a gotcha with the ExceptionGroups that you'll see raised from most TaskGroups if you don't force in some workaround to prevent them from cancelling tasks as soon as one raises an Exception. The first time I read the documentation for TaskGroup, I left with the impression that the TaskGroup would neatly capture all the exceptions raised until all the tasks had completed, at which time it would raise an ExceptionGroup with all the exceptions it saw while it was running its tasks. But in reality, since TaskGroups cancel all other tasks as soon as one raises an exception, the only exceptions you'll see in that ExceptionGroup are the exceptions that are raised within that same exact iteration of the event loop after all the tasks are cancel()ed. So unless you actively try to coordinate your tasks to all raise their exceptions at the same time, you're almost always going to only see one or two exceptions in an ExceptionGroup at a time. I certainly didn't realize this at first, as I failed to note the nuances between a task being "cancelled" versus "finishing" when I first read the TaskGroup documentation:
The first time any of the tasks belonging to the group fails with an exception other than asyncio.CancelledError, the remaining tasks in the group are cancelled. [...]
Once all tasks have finished, if any tasks have failed with an exception other than asyncio.CancelledError, those exceptions are combined in an ExceptionGroup or BaseExceptionGroup (as appropriate; see their documentation) which is then raised.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With