Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python coroutines: Release context manager when pausing

Background: I'm a very experienced Python programmer who is completely clueless about the new coroutines/async/await features. I can't write an async "hello world" to save my life.

My question is: I am given an arbitrary coroutine function f. I want to write a coroutine function g that will wrap f, i.e. I will give g to the user as if it was f, and the user will call it and be none the wiser, since g will be using f under the hood. Like when you decorate a normal Python function to add functionality.

The functionality that I want to add: Whenever the program flow goes into my coroutine, it acquires a context manager that I provide, and as soon as program flow goes out of the coroutine, it releases that context manager. Flow comes back in? Re-acquire the context manager. It goes back out? Re-release it. Until the coroutine is completely finished.

To demonstrate, here is the described functionality with plain generators:

def generator_wrapper(_, *args, **kwargs):
    gen = function(*args, **kwargs)
    method, incoming = gen.send, None
    while True:
        with self:
            outgoing = method(incoming)
        try:
            method, incoming = gen.send, (yield outgoing)
        except Exception as e:
            method, incoming = gen.throw, e

Is it possible to do it with coroutines?

like image 794
Ram Rachum Avatar asked May 10 '19 13:05

Ram Rachum


1 Answers

Coroutines are built on iterators - the __await__ special method is a regular iterator. This allows you to wrap the underlying iterator in yet another iterator. The trick is that you must unwrap the iterator of your target using its __await__, then re-wrap your own iterator using your own __await__.

The core functionality that works on instantiated coroutines looks like this:

class CoroWrapper:
    """Wrap ``target`` to have every send issued in a ``context``"""
    def __init__(self, target: 'Coroutine', context: 'ContextManager'):
        self.target = target
        self.context = context

    # wrap an iterator for use with 'await'
    def __await__(self):
        # unwrap the underlying iterator
        target_iter = self.target.__await__()
        # emulate 'yield from'
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        while True:
            # communicate with the target coroutine
            try:
                with self.context:
                    signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            # communicate with the ambient event loop
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

Note that this explicitly works on a Coroutine, not an Awaitable - Coroutine.__await__ implements the generator interface. In theory, an Awaitable does not necessarily provide __await__().send or __await__().throw.

This is enough to pass messages in and out:

import asyncio


class PrintContext:
    def __enter__(self):
        print('enter')

    def __exit__(self, exc_type, exc_val, exc_tb):
        print('exit via', exc_type)
        return False


async def main_coro():
    print(
        'wrapper returned',
        await CoroWrapper(test_coro(), PrintContext())
    )


async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

asyncio.run(main_coro())
# enter
# exit via None
# enter
# exit <class 'StopIteration'>
# wrapper returned 2

You can delegate the wrapping part to a separate decorator. This also ensures that you have an actual coroutine, not a custom class - some async libraries require this.

from functools import wraps


def send_context(context: 'ContextManager'):
    """Wrap a coroutine to issue every send in a context"""
    def coro_wrapper(target: 'Callable[..., Coroutine]') -> 'Callable[..., Coroutine]':
        @wraps(target)
        async def context_coroutine(*args, **kwargs):
            return await CoroWrapper(target(*args, **kwargs), context)
        return context_coroutine
    return coro_wrapper

This allows you to directly decorate a coroutine function:

@send_context(PrintContext())
async def test_coro(delay=0.5):
    await asyncio.sleep(delay)
    return 2

print('async run returned:', asyncio.run(test_coro()))
# enter
# exit via None
# enter
# exit via <class 'StopIteration'>
# async run returned: 2
like image 93
MisterMiyagi Avatar answered Oct 21 '22 09:10

MisterMiyagi