Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use asyncio with existing blocking library?

I have few blocking functions foo, bar and I can't change those (Some internal library I don't control. Talks to one or more network services). How do I use it as async?. E.g. I wan't to do the following.

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

This will be inefficient as I can call foo for the second input while I am waiting for the first and same for bar. How do I wrap them such that they are usable with asyncio (i.e new async, await syntax)?

Lets assume the functions are re-entrant. i.e it is fine to call foo again when already a previous foo is processing.


Update

Extending answer with reusable decorator. Click here for example.

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner
like image 456
balki Avatar asked Dec 09 '16 15:12

balki


3 Answers

There are (sort of) two questions here:

  1. how can I run blocking code asynchronously within a coroutine
  2. how can I run multiple async tasks at the "same" time (as an aside: asyncio is single-threaded, so it is concurrent, but not truly parallel).

Concurrent tasks can be created using the high-level asyncio.create_task or the low-level asyncio.ensure_future. Starting with 3.11, they can also be created through asyncio task groups, as pioneered by the Trio library (the creator of Trio has an excellent blog post on the subject here).

To run synchronous code, you will need to run the blocking code in an executor. Example:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')


async def non_blocking(executor):
    loop = asyncio.get_running_loop()
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))

If you want to schedule these tasks using a for loop (as in your example), you have several different strategies, but the underlying approach is to schedule the tasks using the for loop (or list comprehension, etc), await them with asyncio.wait, and then retrieve the results. Example:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]
like image 127
Nick Badger Avatar answered Oct 10 '22 07:10

Nick Badger


Extending the accepted answer to actually solve the problem in question.

Note: Requires python 3.7+

import functools

from urllib.request import urlopen
import asyncio


def legacy_blocking_function():  # You cannot change this function
    r = urlopen("https://example.com")
    return r.read().decode()


def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, lambda: f(*args, **kwargs))

    return inner


@run_in_executor
def foo(arg):  # Your wrapper for async use
    resp = legacy_blocking_function()
    return f"{arg}{len(resp)}"


@run_in_executor
def bar(arg):  # Another wrapper
    resp = legacy_blocking_function()
    return f"{len(resp)}{arg}"


async def process_input(inp):  # Modern async function (coroutine)
    res = await foo(inp)
    res = f"XXX{res}XXX"
    return await bar(res)


async def main():
    inputs = ["one", "two", "three"]
    input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs]
    print([await t for t in asyncio.as_completed(input_tasks)])
    # This doesn't work as expected :(
    # print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])])


if __name__ == '__main__':
asyncio.run(main())

Click here for up to date version of this example and to send pull requests.

like image 31
balki Avatar answered Oct 10 '22 08:10

balki


Now you can also just use asyncio.to_thread to asynchronously run function in a separate thread.

import asyncio
import time

def blocking():
    print("Started!")
    time.sleep(5)
    print("Finished!")

async def non_blocking():
    print("Started!")
    await asyncio.sleep(5)
    print("Finished!")

async def main():
    print("Blocking function:")
    start_time = time.time()
    await asyncio.gather(*(asyncio.to_thread(blocking) for i in range(5)))
    print("--- %s seconds ---" % (time.time() - start_time))

    print("Non-blocking function:")
    start_time = time.time()
    await asyncio.gather(*(non_blocking() for i in range(5)))
    print("--- %s seconds ---" % (time.time() - start_time))

asyncio.run(main())

Output:

Blocking function:
Started!
Started!
Started!
Started!
Started!
Finished!
Finished!
Finished!
Finished!
Finished!
--- 5.018239736557007 seconds ---
Non-blocking function:
Started!
Started!
Started!
Started!
Started!
Finished!
Finished!
Finished!
Finished!
Finished!
--- 5.005802392959595 seconds ---

Documentation: https://docs.python.org/3.11/library/asyncio-task.html#asyncio.to_thread

like image 4
user2340939 Avatar answered Oct 10 '22 08:10

user2340939