Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting a Python function with a callback to an asyncio awaitable

I want to use the PyAudio library in an async context, but the main entry point for the library only has a callback-based API:

import pyaudio

def callback(in_data, frame_count, time_info, status):
    # Do something with data

pa = pyaudio.PyAudio()
self.stream = self.pa.open(
    stream_callback=callback
)

How I'm hoping to use it is something like this:

pa = SOME_ASYNC_COROUTINE()
async def listen():
    async for block in pa:
        # Do something with block

The problem is, I'm not sure how to convert this callback syntax to a future that completes when the callback fires. In JavaScript I would use promise.promisify(), but Python doesn't seem to have anything like that.

like image 743
Migwell Avatar asked Jan 01 '19 06:01

Migwell


People also ask

How do I call a function asynchronously in Python?

To run an async function (coroutine) you have to call it using an Event Loop. Event Loops: You can think of Event Loop as functions to run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Example 1: Event Loop example to run async Function to run a single async function: Python3.

How is Python Asyncio implemented?

An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.

When should I use Asyncio Python?

asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. asyncio is often a perfect fit for IO-bound and high-level structured network code.

Why can’t I use a callback with asyncio?

This requires precautions to correctly communicate with asyncio. The callback cannot be modeled by a single future because it is invoked multiple times, whereas a future can only have one result. Instead, it must be converted to an async iterator, just as shown in your sample code. Here is one possible implementation:

What is the difference between callback and async iterator in pyaudio?

The callback may be called to call from an arbitrary thread and is thus safe to pass to pyaudio.open, while the async iterator should be given to async forin an asyncio coroutine, which will be suspended while waiting for the next value:

What is async/await in Python?

async/await: two new Python keywords that are used to define coroutines. asyncio: the Python package that provides a foundation and API for running and managing coroutines. Coroutines (specialized generator functions) are the heart of async IO in Python, and we’ll dive into them later on.

What is asyncio in Python?

The asyncio package is billed by the Python documentation as a library to write concurrent code. However, async IO is not threading, nor is it multiprocessing. It is not built on top of either of these.


2 Answers

An equivalent of promisify wouldn't work for this use case for two reasons:

  • PyAudio's async API doesn't use the asyncio event loop - the documentation specifies that the callback is invoked from a background thread. This requires precautions to correctly communicate with asyncio.
  • The callback cannot be modeled by a single future because it is invoked multiple times, whereas a future can only have one result. Instead, it must be converted to an async iterator, just as shown in your sample code.

Here is one possible implementation:

def make_iter():
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()
    def put(*args):
        loop.call_soon_threadsafe(queue.put_nowait, args)
    async def get():
        while True:
            yield await queue.get()
    return get(), put

make_iter returns a pair of <async iterator, put-callback>. The returned objects hold the property that invoking the callback causes the iterator to produce its next value (the arguments passed to the callback). The callback may be called to call from an arbitrary thread and is thus safe to pass to pyaudio.open, while the async iterator should be given to async for in an asyncio coroutine, which will be suspended while waiting for the next value:

async def main():
    stream_get, stream_put = make_iter()
    stream = pa.open(stream_callback=stream_put)
    stream.start_stream()
    async for in_data, frame_count, time_info, status in stream_get:
        # ...

asyncio.get_event_loop().run_until_complete(main())

Note that, according to the documentation, the callback must also return a meaningful value, a tuple of frames and a Boolean flag. This can be incorporated in the design by changing the fill function to also receive the data from the asyncio side. The implementation is not included because it might not make much sense without an understanding of the domain.

like image 157
user4815162342 Avatar answered Oct 05 '22 23:10

user4815162342


You may want to use a Future

class asyncio.Future(*, loop=None)¶

A Future represents an eventual result of an asynchronous operation. Not thread-safe.

Future is an awaitable object. Coroutines can await on Future objects until they either have a result or an exception set, or until they are cancelled.

Typically Futures are used to enable low-level callback-based code (e.g. in protocols implemented using asyncio transports) to interoperate with high-level async/await code.

The rule of thumb is to never expose Future objects in user-facing APIs, and the recommended way to create a Future object is to call loop.create_future(). This way alternative event loop implementations can inject their own optimized implementations of a Future object.

A silly example:

def my_func(loop):
    fut = loop.create_future()
    pa.open(
        stream_callback=lambda *a, **kw: fut.set_result([a, kw])
    )
    return fut


async def main(loop):
    result = await my_func(loop)  # returns a list with args and kwargs 

I assume that pa.open runs in a thread or a subprocess. If not, you may also need to wrap the call to open with asyncio.loop.run_in_executor

like image 31
Oleksandr Fedorov Avatar answered Oct 05 '22 23:10

Oleksandr Fedorov