Is it possible to combine async context managers in python? Something similar to asyncio.gather
, but able to be used with context managers. Something like this:
async def foo():
async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
await vm1.do_something()
await vm2.do_something()
Is this currently possible?
Asynchronous context managers are, fairly logically, an extension of the concept of context managers to work in an asynchronous environment, and you will find that they are used a lot in asyncio-based library interfaces. An asynchronous context manager is an object which can be used in an async with statement.
__enter__() is provided which returns self while object. __exit__() is an abstract method which by default returns None . See also the definition of Context Manager Types. New in version 3.6.
An ExitStack is (as the name suggests) a stack of clean-up functions. Adding a callback to the stack is the equivalent of calling Go's defer statement. However, clean-up functions are not executed when the function returns, but when execution leaves the with block - and until then, the stack can also be emptied again.
The contextlib module of Python's standard library provides utilities for resource allocation to the with statement. The with statement in Python is used for resource management and exception handling. Therefore, it serves as a good Context Manager.
Something close to gather_cm
can be achieved with AsyncExitStack
, introduced in Python 3.7:
async def foo():
async with AsyncExitStack() as stack:
vm1, vm2 = await asyncio.gather(
stack.enter_async_context(start_vm()),
stack.enter_async_context(start_vm()))
await vm1.do_something()
await vm2.do_something()
Unfortunately, __aexit__
s will still be run sequentially. This is because AsyncExitStack
simulates nested context managers, which have a well-defined order and cannot overlap. The outer context manager's __aexit__
is given information on whether the inner one raised an exception. (A database handle's __aexit__
might use this to roll back the transaction in case of exception and commit it otherwise.) Running __aexit__
s in parallel would make the context managers overlap and the exception information unavailable or unreliable. So although gather(...)
runs __aenter__
s in parallel, AsyncExitStack
records which one came first and runs the __aexit__
s in reverse order.
With async context managers an alternative like gather_cm
would make perfect sense. One could drop the nesting semantics and provide an aggregate context manager that worked like an "exit pool" rather than a stack. The exit pool takes a number of context manager that are independent of each other, which allows their __aenter__
and __aexit__
methods to be run in parallel.
The tricky part is handling exceptions correctly: If any __aenter__
raises, the exception must be propagated to prevent the with
block from being run. To ensure correctness, the pool must guarantee that __aexit__
will be invoked on all the context managers whose __aenter__
has completed.
Here is an example implementation:
import asyncio
import sys
class gather_cm:
def __init__(self, *cms):
self._cms = cms
async def __aenter__(self):
futs = [asyncio.create_task(cm.__aenter__())
for cm in self._cms]
await asyncio.wait(futs)
# only exit the cms we've successfully entered
self._cms = [cm for cm, fut in zip(self._cms, futs)
if not fut.cancelled() and not fut.exception()]
try:
return tuple(fut.result() for fut in futs)
except:
await self._exit(*sys.exc_info())
raise
async def _exit(self, *args):
# don't use gather() to ensure that we wait for all __aexit__s
# to complete even if one of them raises
done, _pending = await asyncio.wait(
[cm.__aexit__(*args)
for cm in self._cms if cm is not None])
return all(suppress.result() for suppress in done)
async def __aexit__(self, *args):
# Since exits are running in parallel, so they can't see each
# other exceptions. Send exception info from `async with`
# body to all.
return await self._exit(*args)
This test program shows how it works:
class test_cm:
def __init__(self, x):
self.x = x
async def __aenter__(self):
print('__aenter__', self.x)
return self.x
async def __aexit__(self, *args):
print('__aexit__', self.x, args)
async def foo():
async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
print('cm1', cm1)
print('cm2', cm2)
asyncio.run(foo())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With