EDIT: I've had questions about what the video stream is, so I will offer more clarity. The stream is a live video feed from my webcam, accessed via OpenCV. I get each frame as the camera reads it, and send it to a separate process for processing. The process returns text based on computations done on the image. The text is then displayed onto the image. I need to display the stream in realtime, and it is ok if there is a lag between the text and the video being shown (i.e. if the text was applicable to a previous frame, that's ok).
Perhaps an easier way to think of this is that I'm doing image recognition on what the webcam sees. I send one frame at a time to a separate process to do recognition analysis on the frame, and send the text back to be put as a caption on the live feed. Obviously the processing takes more time than simply grabbing frames from the webcam and showing them, so if there is a delay in what the caption is and what the webcam feed shows, that's acceptable and expected.
What's happening now is that the live video I'm displaying is lagging due to the other processes (when I don't send frames to the process for computing, there is no lag). I've also ensured only one frame is enqueued at a time so avoid overloading the queue and causing lag. I've updated the code below to reflect this detail.
I'm using the multiprocessing module in python to help speed up my main program. However I believe I might be doing something incorrectly, as I don't think the computations are happening quite in parallel.
I want my program to read in images from a video stream in the main process, and pass on the frames to two child processes that do computations on them and send text back (containing the results of the computations) to the main process.
However, the main process seems to lag when I use multiprocessing, running about half as fast as without it, leading me to believe that the processes aren't running completely in parallel.
After doing some research, I surmised that the lag may have been due to communicating between the processes using a queue (passing an image from the main to the child, and passing back text from child to main).
However I commented out the computational step and just had the main process pass an image and the child return blank text, and in this case, the main process did not slow down at all. It ran at full speed.
Thus I believe that either
1) I am not optimally using multiprocessing
OR
2) These processes cannot truly be run in parallel (I would understand a little lag, but it's slowing the main process down in half).
Here's a outline of my code. There is only one consumer instead of 2, but both consumers are nearly identical. If anyone could offer guidance, I would appreciate it.
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
#other initialization stuff
def run(self):
while True:
image = self.task_queue.get()
#Do computations on image
self.result_queue.put("text")
return
import cv2
tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
if tasks.empty():
tasks.put(image)
else:
text = tasks.get()
#Add text to frame
cv2.putText(frame,text)
#Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
#Getting next frame from camera
rval, frame = vc.read()
Multiprocessing in Python enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. Multiprocessing enables the computer to utilize multiple cores of a CPU to run tasks/processes in parallel. This parallelization leads to significant speedup in tasks that involve a lot of computation.
Supercomputers typically combine millions of such microprocessors to interpret and execute instructions. The primary advantage of a multiprocessor computer is speed, and thus the ability to manage larger amounts of information.
The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.
Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.
I want my program to read in images from a video stream in the main process
In producer/consumer implementations, which is what you have above, the producer, what puts tasks into the queue to be executed by the consumers, needs to be separate from the main/controlling process so that it can add tasks in parallel with the main process reading output from results queue.
Try the following. Have added a sleep in the consumer processes to simulate processing and added a second consumer to show they are being run in parallel.
It would also be a good idea to limit the size of the task queue to avoid having it run away with memory usage if processing cannot keep up with input stream. Can specify a size when calling Queue(<size>)
. If the queue is at that size, calls to .put
will block until the queue is not full.
import time
import multiprocessing
import cv2
class ImageProcessor(multiprocessing.Process):
def __init__(self, tasks_q, results_q):
multiprocessing.Process.__init__(self)
self.tasks_q = tasks_q
self.results_q = results_q
def run(self):
while True:
image = self.tasks_q.get()
# Do computations on image
time.sleep(1)
# Display the result on stream
self.results_q.put("text")
# Tasks queue with size 1 - only want one image queued
# for processing.
# Queue size should therefore match number of processes
tasks_q, results_q = multiprocessing.Queue(1), multiprocessing.Queue()
processor = ImageProcessor(tasks_q, results_q)
processor.start()
def capture_display_video(vc):
rval, frame = vc.read()
while rval:
image = frame.get_image()
if not tasks_q.full():
tasks_q.put(image)
if not results_q.empty():
text = results_q.get()
cv2.putText(frame, text)
cv2.imshow("preview", frame)
rval, frame = vc.read()
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
if not vc.isOpened():
raise Exception("Cannot capture video")
capture_display_video(vc)
processor.terminate()
Here's a more elegant (IMHO) solution that utilizes multiple processes for processing your frames:
def process_image(args):
image, frame = args
#Do computations on image
return "text", frame
import cv2
pool = multiprocessing.Pool()
def image_source():
#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
rval, frame = vc.read()
else:
rval = False
while rval:
yield image, frame
# Getting next frame from camera
rval, frame = vc.read()
for (text, frame) in pool.imap(process_image, image_source()):
# Add text to frame
cv2.putText(frame, text)
# Showing the frame with all the applied modifications
cv2.imshow("preview", frame)
Pool.imap
should allow you to iterate through the pool's results while it's still processing other images from your cam.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With