Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asyncio Queue waits until it is full before get returns something

I'm having a weird issue with asyncio.Queue - instead of returning an item as soon as it available, the queue waits until it is full before returning anything. I realized that while using a queue to store frames collected from cv2.VideoCapture, the larger the maxsize of the queue was, the longer it took to show anything on screen, and then, it looked like a sequence of all the frames collected into the queue.
Is that a feature, a bug, or am i just using this wrong?
Anyway, here is my code

import asyncio  
import cv2  
import numpy as np  


async def collecting_loop(queue):  
    print("cl")  
    cap = cv2.VideoCapture(0)  
    while True:  
        _, img = cap.read()  
        await queue.put(img)  


async def processing_loop(queue):  
    print("pl")  
    await asyncio.sleep(0.1)  
    while True:  
        img = await queue.get()  
        cv2.imshow('img', img)  
        cv2.waitKey(5)  


async def main(e_loop):  
    print("running main")  
    queue = asyncio.Queue(loop=e_loop, maxsize=10)
    await asyncio.gather(collecting_loop(queue), processing_loop(queue))


loop = asyncio.get_event_loop()   
try:   
    loop.run_until_complete(main(e_loop=loop))   
except KeyboardInterrupt:   
    pass   
finally:   
    loop.close()   
like image 358
Simon Kharmatsky Avatar asked Jan 02 '23 15:01

Simon Kharmatsky


2 Answers

Is [the queue getter not waking up until the queue fills up] a feature, a bug, or am i just using this wrong?

You're using it wrong, but in a subtle way. As Andrew explained, queue.put doesn't guarantee a task switch, and the collector coroutine only runs blocking code and queue.put. Although the blockade is short, asyncio doesn't know that and thinks you are invoking queue.put in a really tight loop. The queue getters simply don't get a chance to run until the queue fills up.

The correct way to integrate asyncio and cv is to run the cv code in a separate thread and have the asyncio event loop wait for it to finish. The run_in_executor method makes that really simple:

async def collecting_loop(queue):  
    print("cl")  
    loop = asyncio.get_event_loop()
    cap = cv2.VideoCapture(0)  
    while True:  
        _, img = await loop.run_in_executor(None, cap.read)
        await queue.put(img)

run_in_executor will automatically suspend the collector coroutine while waiting for a new frame, allowing for the queued frame(s) to be processed in time.

like image 128
user4815162342 Avatar answered Jan 05 '23 15:01

user4815162342


The problem is that await q.put() doesn't switch to another task every call. Actually it does only when inserting a new value is suspended by queue-full state transition.

Inserting await asyncio.sleep(0) forces task switch. Like in multithreaded code file.read() doesn't enforce OS thread switching but time.sleep(0) does.

Misunderstandings like this are pretty common for newbies, I've discussed very similar problem yesterday, see github issue.

P.S.

Your code has much worse problem actually: you call blocking synchronous code from async function, it just is not how asyncio works.

If no asynchronous OpenCV API exists (yet) you should run OpenCV functions in a separate thread.

Already mentioned janus can help with passing data between sync and async code.

like image 28
Andrew Svetlov Avatar answered Jan 05 '23 15:01

Andrew Svetlov