Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to use asyncio.Queue in multiple threads?

Tags:

Let's assume I have the following code:

import asyncio import threading  queue = asyncio.Queue()  def threaded():     import time     while True:         time.sleep(2)         queue.put_nowait(time.time())         print(queue.qsize())  @asyncio.coroutine def async():     while True:         time = yield from queue.get()         print(time)  loop = asyncio.get_event_loop() asyncio.Task(async()) threading.Thread(target=threaded).start() loop.run_forever() 

The problem with this code is that the loop inside async coroutine is never finishing the first iteration, while queue size is increasing.

Why is this happening this way and what can I do to fix it?

I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio.

like image 317
Aleksandr Kovalev Avatar asked Oct 01 '15 13:10

Aleksandr Kovalev


People also ask

Does Asyncio run on multiple threads?

However, async and threading can run multiple IO operations truly at the same time. Asyncio vs threading: Async runs one block of code at a time while threading just one line of code at a time.

Is Asyncio queue thread-safe?

asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio.

Does Asyncio use multiprocessing?

asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes. Using this, we can get the benefits of the asyncio library even when using CPU-bound code.


2 Answers

asyncio.Queue is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus, which is a third-party library that provides a thread-aware asyncio queue:

import asyncio import threading import janus  def threaded(squeue):     import time     while True:         time.sleep(2)         squeue.put_nowait(time.time())         print(squeue.qsize())  @asyncio.coroutine def async(aqueue):     while True:         time = yield from aqueue.get()         print(time)  loop = asyncio.get_event_loop() queue = janus.Queue(loop=loop) asyncio.Task(asyncio.ensure_future(queue.async_q)) threading.Thread(target=threaded, args=(queue.sync_q,)).start() loop.run_forever() 

There is also aioprocessing (full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing.

Edit

As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe to add to the queue, as well.

like image 83
dano Avatar answered Sep 19 '22 12:09

dano


BaseEventLoop.call_soon_threadsafe is at hand. See asyncio doc for detail.

Simply change your threaded() like this:

def threaded():     import time     while True:         time.sleep(1)         loop.call_soon_threadsafe(queue.put_nowait, time.time())         loop.call_soon_threadsafe(lambda: print(queue.qsize())) 

Here's a sample output:

0 1443857763.3355968 0 1443857764.3368602 0 1443857765.338082 0 1443857766.3392274 0 1443857767.3403943 
like image 31
Huazuo Gao Avatar answered Sep 23 '22 12:09

Huazuo Gao