I'm having a weird issue with asyncio.Queue
- instead of returning an item as soon as it available, the queue waits until it is full before returning anything. I realized that while using a queue to store frames collected from cv2.VideoCapture
, the larger the maxsize
of the queue was, the longer it took to show anything on screen, and then, it looked like a sequence of all the frames collected into the queue.
Is that a feature, a bug, or am i just using this wrong?
Anyway, here is my code
import asyncio
import cv2
import numpy as np
async def collecting_loop(queue):
print("cl")
cap = cv2.VideoCapture(0)
while True:
_, img = cap.read()
await queue.put(img)
async def processing_loop(queue):
print("pl")
await asyncio.sleep(0.1)
while True:
img = await queue.get()
cv2.imshow('img', img)
cv2.waitKey(5)
async def main(e_loop):
print("running main")
queue = asyncio.Queue(loop=e_loop, maxsize=10)
await asyncio.gather(collecting_loop(queue), processing_loop(queue))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(e_loop=loop))
except KeyboardInterrupt:
pass
finally:
loop.close()
Is [the queue getter not waking up until the queue fills up] a feature, a bug, or am i just using this wrong?
You're using it wrong, but in a subtle way. As Andrew explained, queue.put
doesn't guarantee a task switch, and the collector coroutine only runs blocking code and queue.put
. Although the blockade is short, asyncio doesn't know that and thinks you are invoking queue.put
in a really tight loop. The queue getters simply don't get a chance to run until the queue fills up.
The correct way to integrate asyncio and cv is to run the cv code in a separate thread and have the asyncio event loop wait for it to finish. The run_in_executor
method makes that really simple:
async def collecting_loop(queue):
print("cl")
loop = asyncio.get_event_loop()
cap = cv2.VideoCapture(0)
while True:
_, img = await loop.run_in_executor(None, cap.read)
await queue.put(img)
run_in_executor
will automatically suspend the collector coroutine while waiting for a new frame, allowing for the queued frame(s) to be processed in time.
The problem is that await q.put()
doesn't switch to another task every call. Actually it does only when inserting a new value is suspended by queue-full state transition.
Inserting await asyncio.sleep(0)
forces task switch.
Like in multithreaded code file.read()
doesn't enforce OS thread switching but time.sleep(0)
does.
Misunderstandings like this are pretty common for newbies, I've discussed very similar problem yesterday, see github issue.
P.S.
Your code has much worse problem actually: you call blocking synchronous code from async function, it just is not how asyncio works.
If no asynchronous OpenCV API exists (yet) you should run OpenCV functions in a separate thread.
Already mentioned janus can help with passing data between sync and async code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With