Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Communication between async tasks and synchronous threads in python

I am looking for the best solution for communication between async tasks and methods/functions that run in a thread pool executor from concurrent.futures. In previous synchronous projects, I would use the queue.Queue class. I assume that any method should be thread safe and therefore the asyncio.queue will not work.

I have seen people extend the queue.Queue class to do something like:

class async_queue(Queue):
  async def aput(self, item):
    self.put_nowait(item)

  async def aget(self):
    resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
    return resp

Is there a better way?

like image 719
Scott Avatar asked Mar 02 '23 23:03

Scott


1 Answers

I would recommend going the other way around: using the asyncio.Queue class to communicate between the two worlds. This has the advantage of not having to spend a slot in the thread pool on operations that take a long time to complete, such as a get().

Here is an example:

class Queue:
    def __init__(self):
        self._loop = asyncio.get_running_loop()
        self._queue = asyncio.Queue()

    def sync_put_nowait(self, item):
        self._loop.call_soon(self._queue.put_nowait, item)

    def sync_put(self, item):
        asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()

    def sync_get(self):
        return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()

    def async_put_nowait(self, item):
        self._queue.put_nowait(item)

    async def async_put(self, item):
        await self._queue.put(item)

    async def async_get(self):
        return await self._queue.get()

The methods prefixed with sync_ are meant to be invoked by sync code (running outside the event loop thread). The ones prefixed with async_ are to be called by code running in the event loop thread, regardless of whether they are actually coroutines. (put_nowait, for example, is not a coroutine, but it still must be distinguished between a sync and an async version.)

like image 200
user4815162342 Avatar answered Mar 05 '23 16:03

user4815162342