Let's assume I have the following code:
import asyncio import threading queue = asyncio.Queue() def threaded(): import time while True: time.sleep(2) queue.put_nowait(time.time()) print(queue.qsize()) @asyncio.coroutine def async(): while True: time = yield from queue.get() print(time) loop = asyncio.get_event_loop() asyncio.Task(async()) threading.Thread(target=threaded).start() loop.run_forever()
The problem with this code is that the loop inside async
coroutine is never finishing the first iteration, while queue
size is increasing.
Why is this happening this way and what can I do to fix it?
I can't get rid of separate thread, because in my real code I use a separate thread to communicate with a serial device, and I haven't find a way to do that using asyncio
.
However, async and threading can run multiple IO operations truly at the same time. Asyncio vs threading: Async runs one block of code at a time while threading just one line of code at a time.
asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio.
asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes. Using this, we can get the benefits of the asyncio library even when using CPU-bound code.
asyncio.Queue
is not thread-safe, so you can't use it directly from more than one thread. Instead, you can use janus
, which is a third-party library that provides a thread-aware asyncio
queue:
import asyncio import threading import janus def threaded(squeue): import time while True: time.sleep(2) squeue.put_nowait(time.time()) print(squeue.qsize()) @asyncio.coroutine def async(aqueue): while True: time = yield from aqueue.get() print(time) loop = asyncio.get_event_loop() queue = janus.Queue(loop=loop) asyncio.Task(asyncio.ensure_future(queue.async_q)) threading.Thread(target=threaded, args=(queue.sync_q,)).start() loop.run_forever()
There is also aioprocessing
(full-disclosure: I wrote it), which provides process-safe (and as a side-effect, thread-safe) queues as well, but that's overkill if you're not trying to use multiprocessing
.
Edit
As pointed it out in other answers, for simple use-cases you can use loop.call_soon_threadsafe
to add to the queue, as well.
BaseEventLoop.call_soon_threadsafe
is at hand. See asyncio
doc for detail.
Simply change your threaded()
like this:
def threaded(): import time while True: time.sleep(1) loop.call_soon_threadsafe(queue.put_nowait, time.time()) loop.call_soon_threadsafe(lambda: print(queue.qsize()))
Here's a sample output:
0 1443857763.3355968 0 1443857764.3368602 0 1443857765.338082 0 1443857766.3392274 0 1443857767.3403943
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With