Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Duplication of code for synchronous and asynchronous implementations

When implementing classes that have uses in both synchronous and asynchronous applications, I find myself maintaining virtually identical code for both use cases.

Just as an example, consider:

from time import sleep
import asyncio


class UselessExample:
    def __init__(self, delay):
        self.delay = delay

    async def a_ticker(self, to):
        for i in range(to):
            yield i
            await asyncio.sleep(self.delay)

    def ticker(self, to):
        for i in range(to):
            yield i
            sleep(self.delay)


def func(ue):
    for value in ue.ticker(5):
        print(value)


async def a_func(ue):
    async for value in ue.a_ticker(5):
        print(value)


def main():
    ue = UselessExample(1)
    func(ue)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(a_func(ue))


if __name__ == '__main__':
    main()

In this example, it's not too bad, the ticker methods of UselessExample are easy to maintain in tandem, but you can imagine that exception handling and more complicated functionality can quickly grow a method and make it more of an issue, even though both methods can remain virtually identical (only replacing certain elements with their asynchronous counterparts).

Assuming there's no substantial difference that makes it worth having both fully implemented, what is the best (and most Pythonic) way of maintaining a class like this and avoiding needless duplication?

like image 286
Grismar Avatar asked Mar 14 '19 00:03

Grismar


People also ask

Does async await make code synchronous?

Async/await helps you write synchronous-looking JavaScript code that works asynchronously. Await is in an async function to ensure that all promises that are returned in the function are synchronized. With async/await, there's no use of callbacks.

What is an asynchronous framework?

A. Single-threaded: Asynchronous frameworks support high concurrency, typically with a single thread. B. Event-driven: Async frameworks use non-blocking sockets, and register for notifications on them using EPoll, Kqueue, or similar.


2 Answers

There is no one-size-fits-all road to making an asyncio coroutine-based codebase useable from traditional synchronous codebases. You have to make choices per codepath.

Pick and choose from a series of tools:

Synchronous versions using asyncio.run()

Provide synchronous wrappers around coroutines, which block until the coroutine completes.

Even an async generator function such as ticker() can be handled this way, in a loop:

class UselessExample:     def __init__(self, delay):         self.delay = delay      async def a_ticker(self, to):         for i in range(to):             yield i             await asyncio.sleep(self.delay)      def ticker(self, to):         agen = self.a_ticker(to)         try:             while True:                 yield asyncio.run(agen.__anext__())         except StopAsyncIteration:             return 

These synchronous wrappers can be generated with helper functions:

from functools import wraps  def sync_agen_method(agen_method):     @wraps(agen_method)     def wrapper(self, *args, **kwargs):         agen = agen_method(self, *args, **kwargs)            try:             while True:                 yield asyncio.run(agen.__anext__())         except StopAsyncIteration:             return     if wrapper.__name__[:2] == 'a_':         wrapper.__name__ = wrapper.__name__[2:]     return wrapper      

then just use ticker = sync_agen_method(a_ticker) in the class definition.

Straight-up coroutine methods (not generator coroutines) could be wrapped with:

def sync_method(async_method):     @wraps(async_method)     def wrapper(self, *args, **kwargs):         return async.run(async_method(self, *args, **kwargs))     if wrapper.__name__[:2] == 'a_':         wrapper.__name__ = wrapper.__name__[2:]     return wrapper 

Factor out common components

Refactor out the synchronous parts, into generators, context managers, utility functions, etc.

For your specific example, pulling out the for loop into a separate generator would minimise the duplicated code to the way the two versions sleep:

class UselessExample:     def __init__(self, delay):         self.delay = delay      def _ticker_gen(self, to):         yield from range(to)      async def a_ticker(self, to):         for i in self._ticker_gen(to):             yield i             await asyncio.sleep(self.delay)      def ticker(self, to):         for i in self._ticker_gen(to):             yield i             sleep(self.delay) 

While this doesn't make much of any difference here it can work in other contexts.

Abstract Syntax Tree tranformation

Use AST rewriting and a map to transform coroutines into synchronous code. This can be quite fragile if you are not careful on how you recognise utility functions such as asyncio.sleep() vs time.sleep():

import inspect import ast import copy import textwrap import time  asynciomap = {     # asyncio function to (additional globals, replacement source) tuples     "sleep": ({"time": time}, "time.sleep") }   class AsyncToSync(ast.NodeTransformer):     def __init__(self):         self.globals = {}      def visit_AsyncFunctionDef(self, node):         return ast.copy_location(             ast.FunctionDef(                 node.name,                 self.visit(node.args),                 [self.visit(stmt) for stmt in node.body],                 [self.visit(stmt) for stmt in node.decorator_list],                 node.returns and ast.visit(node.returns),             ),             node,         )      def visit_Await(self, node):         return self.visit(node.value)      def visit_Attribute(self, node):         if (             isinstance(node.value, ast.Name)             and isinstance(node.value.ctx, ast.Load)             and node.value.id == "asyncio"             and node.attr in asynciomap         ):             g, replacement = asynciomap[node.attr]             self.globals.update(g)             return ast.copy_location(                 ast.parse(replacement, mode="eval").body,                 node             )         return node   def transform_sync(f):     filename = inspect.getfile(f)     lines, lineno = inspect.getsourcelines(f)     ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)     ast.increment_lineno(ast_tree, lineno - 1)      transformer = AsyncToSync()     transformer.visit(ast_tree)     tranformed_globals = {**f.__globals__, **transformer.globals}     exec(compile(ast_tree, filename, 'exec'), tranformed_globals)     return tranformed_globals[f.__name__] 

While the above is probably far from complete enough to fit all needs, and transforming AST trees can be daunting, the above would let you maintain just the async version and map that version to synchronous versions directly:

>>> import example >>> del example.UselessExample.ticker >>> example.main() Traceback (most recent call last):   File "<stdin>", line 1, in <module>   File "/.../example.py", line 32, in main     func(ue)   File "/.../example.py", line 21, in func     for value in ue.ticker(5): AttributeError: 'UselessExample' object has no attribute 'ticker' >>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker) >>> example.main() 0 1 2 3 4 0 1 2 3 4 
like image 119
Martijn Pieters Avatar answered Sep 29 '22 20:09

Martijn Pieters


async/await is infectious by design.

Accept that your code will have different users — synchronous and asynchronous, and that these users will have different requirements, that over time the implementations will diverge.

Publish separate libraries

For example, compare aiohttp vs. aiohttp-requests vs. requests.

Likewise, compare asyncpg vs. psycopg2.

How to get there

Opt1. (easy) clone implementation, allow them to diverge.

Opt2. (sensible) partial refactor, let e.g. async library depend on and import sync library.

Opt3. (radical) create a "pure" library that can be used both in sync and async program. For example, see https://github.com/python-hyper/hyper-h2 .

On the upside, testing is easier and thorough. Consider how hard (or impossible) it is force the test framework to evaluate all possible concurrent execution orders in an async program. Pure library doesn't need that :)

On the down-side this style of programming requires different thinking, is not always straightforward, and may be suboptimal. For example, instead of await socket.read(2**20) you'd write for event in fsm.push(data): ... and rely on your library user to provide you with data in good-sized chunks.

For context, see the backpressure argument in https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/

like image 29
Dima Tisnek Avatar answered Sep 29 '22 20:09

Dima Tisnek