Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple Async Context Managers

Is it possible to combine async context managers in python? Something similar to asyncio.gather, but able to be used with context managers. Something like this:

async def foo():
    async with asyncio.gather_cm(start_vm(), start_vm()) as vm1, vm2:
        await vm1.do_something()
        await vm2.do_something()

Is this currently possible?

like image 246
Lily Mara Avatar asked Oct 29 '18 17:10

Lily Mara


People also ask

What is async context manager?

Asynchronous context managers are, fairly logically, an extension of the concept of context managers to work in an asynchronous environment, and you will find that they are used a lot in asyncio-based library interfaces. An asynchronous context manager is an object which can be used in an async with statement.

What does __ enter __ do in Python?

__enter__() is provided which returns self while object. __exit__() is an abstract method which by default returns None . See also the definition of Context Manager Types. New in version 3.6.

What is ExitStack?

An ExitStack is (as the name suggests) a stack of clean-up functions. Adding a callback to the stack is the equivalent of calling Go's defer statement. However, clean-up functions are not executed when the function returns, but when execution leaves the with block - and until then, the stack can also be emptied again.

What is Contextlib?

The contextlib module of Python's standard library provides utilities for resource allocation to the with statement. The with statement in Python is used for resource management and exception handling. Therefore, it serves as a good Context Manager.


1 Answers

Something close to gather_cm can be achieved with AsyncExitStack, introduced in Python 3.7:

async def foo():
    async with AsyncExitStack() as stack:
        vm1, vm2 = await asyncio.gather(
            stack.enter_async_context(start_vm()),
            stack.enter_async_context(start_vm()))
        await vm1.do_something()
        await vm2.do_something()

Unfortunately, __aexit__s will still be run sequentially. This is because AsyncExitStack simulates nested context managers, which have a well-defined order and cannot overlap. The outer context manager's __aexit__ is given information on whether the inner one raised an exception. (A database handle's __aexit__ might use this to roll back the transaction in case of exception and commit it otherwise.) Running __aexit__s in parallel would make the context managers overlap and the exception information unavailable or unreliable. So although gather(...) runs __aenter__s in parallel, AsyncExitStack records which one came first and runs the __aexit__s in reverse order.

With async context managers an alternative like gather_cm would make perfect sense. One could drop the nesting semantics and provide an aggregate context manager that worked like an "exit pool" rather than a stack. The exit pool takes a number of context manager that are independent of each other, which allows their __aenter__ and __aexit__ methods to be run in parallel.

The tricky part is handling exceptions correctly: If any __aenter__ raises, the exception must be propagated to prevent the with block from being run. To ensure correctness, the pool must guarantee that __aexit__ will be invoked on all the context managers whose __aenter__ has completed.

Here is an example implementation:

import asyncio
import sys

class gather_cm:
    def __init__(self, *cms):
        self._cms = cms

    async def __aenter__(self):
        futs = [asyncio.create_task(cm.__aenter__())
                for cm in self._cms]
        await asyncio.wait(futs)
        # only exit the cms we've successfully entered
        self._cms = [cm for cm, fut in zip(self._cms, futs)
                     if not fut.cancelled() and not fut.exception()]
        try:
            return tuple(fut.result() for fut in futs)
        except:
            await self._exit(*sys.exc_info())
            raise

    async def _exit(self, *args):
        # don't use gather() to ensure that we wait for all __aexit__s
        # to complete even if one of them raises
        done, _pending = await asyncio.wait(
            [cm.__aexit__(*args)
             for cm in self._cms if cm is not None])
        return all(suppress.result() for suppress in done)

    async def __aexit__(self, *args):
        # Since exits are running in parallel, so they can't see each
        # other exceptions.  Send exception info from `async with`
        # body to all.
        return await self._exit(*args)

This test program shows how it works:

class test_cm:
    def __init__(self, x):
        self.x = x
    async def __aenter__(self):
        print('__aenter__', self.x)
        return self.x
    async def __aexit__(self, *args):
        print('__aexit__', self.x, args)

async def foo():
    async with gather_cm(test_cm('foo'), test_cm('bar')) as (cm1, cm2):
        print('cm1', cm1)
        print('cm2', cm2)

asyncio.run(foo())
like image 62
user4815162342 Avatar answered Nov 15 '22 16:11

user4815162342