Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async queue of tasks without a await

I have an infinite loop that is capturing video from a camera. Every so often takes an image and processes it to find faces. This process could be running as a background task given that the main process does not need a response from it. So, is there a way to make a queue of asynchronous tasks without the need of stop the main loop for any reason?

In other words. I need to create a queue of tasks that are running concurrently while the main loop is running to and this one could add more tasks to the queue.

I've tried to do it with asyncio, but it does not work. I expected an out put like:

Main loop
Main loop
Main loop
hello
Main loop 
hello
...

But I got:

Main loop             
hello                                                                                                     
Main loop           
hello                                                                                                    
Main loop                                      
hello                                                                                      
Main loop                                               
hello
...

What does it mean they run one at a time.

import os
import cv2
import numpy as np
import asyncio
import time


async def process_frame(frame: np.array) -> None:
    await  asyncio.sleep(2)
    print('hello')
    # Process that could take a lot of time
    # proces the image in a db and if a face from the db is recognized in the image
    # this function will send a request to an web service
    # The main process  does not have to await for a response
    


async def main():
    video_capture = cv2.VideoCapture(0) # Initialize camera
    frame_rate = 30
    prev = 0
    while video_capture.isOpened():
        time_elapsed = time.time() - prev

        _, frame = video_capture.read() # Getting frame from opencv (cv2)
        cv2.imshow('Video', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        print('Main loop')
        if time_elapsed > 1./frame_rate:
            prev = time.time()
            task = asyncio.create_task(process_frame(frame)) # This function needs to be sent to a async queue
            await task
    video_capture.release()


if __name__ == '__main__':
    asyncio.run(main())
like image 830
HBaena Avatar asked Aug 30 '25 17:08

HBaena


1 Answers

The immediate problem is that you await the task immediately after creating it. That will suspend the current coroutine until task finishes, which is what you don't want.

However, when you remove the await, you face the more serious problem - that you are calling blocking functions, such as video_capture.read(), from main which is supposed to be async. That will block the event loop and won't allow the background tasks to proceed.

To fix the issue, you can completely decouple your sync code from the asyncio code, by running the asyncio event loop in a separate thread, and using asyncio.run_coroutine_threadsafe to submit coroutines to it. For example (untested):

import os, cv2, numpy as np, asyncio, time

async def process_frame(frame):
    await asyncio.sleep(2)
    print('hello')
    # ...
    
_loop = asyncio.get_event_loop()
threading.Thread(target=_loop.run_forever, daemon=True).start()

def main():
    video_capture = cv2.VideoCapture(0) # Initialize camera
    frame_rate = 30
    prev = 0
    while video_capture.isOpened():
        time_elapsed = time.time() - prev

        _, frame = video_capture.read()
        cv2.imshow('Video', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        print('Main loop')
        if time_elapsed > 1./frame_rate:
            prev = time.time()
            asyncio.run_coroutine_threadsafe(process_frame(frame), _loop)
    video_capture.release()

if __name__ == '__main__':
    main()
like image 106
user4815162342 Avatar answered Sep 02 '25 06:09

user4815162342