I'm reading out a webcam on OSX, which works fine with this simple script:
import cv2
camera = cv2.VideoCapture(0)
while True:
try:
(grabbed, frame) = camera.read() # grab the current frame
frame = cv2.resize(frame, (640, 480)) # resize the frame
cv2.imshow("Frame", frame) # show the frame to our screen
cv2.waitKey(1) # Display it at least one ms before going to the next frame
except KeyboardInterrupt:
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()
print "\n\nBye bye\n"
break
I now want to read the video in a separate process for which I have script which is a lot longer and which correctly reads out the video in a separate process on Linux:
import numpy as np
import time
import ctypes
import argparse
from multiprocessing import Array, Value, Process
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(floating point is allowed)
"""
self._cap = cv2.VideoCapture(device)
self._delay = delay
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def get_stream_function(self):
"""
Returns stream_function object function
"""
def stream_function(image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing(see multiprocessing.Array)
:param finished: synchronized wrapper for int(see multiprocessing.Value)
:return: nothing
"""
# Incorrect input array
if image.shape != self.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
image[:, :, :] = self._proper_frame(self._delay)
# Release the device
self.release()
return stream_function
def release(self):
self._cap.release()
def main():
# Add program arguments
parser = argparse.ArgumentParser(description='Captures the video from the webcamera and \nwrites it into the output file with predefined fps.', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-output', dest="output", default="output.avi", help='name of the output video file')
parser.add_argument('-log', dest="log", default="frames.log", help='name of the log file')
parser.add_argument('-fps', dest="fps", default=25., help='frames per second value')
# Read the arguments if any
result = parser.parse_args()
fps = float(result.fps)
output = result.output
log = result.log
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
# Define shared variables(which are synchronised so race condition is excluded)
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.terminate()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
cv2.waitKey(1) # Display it at least one ms before going to the next frame
time.sleep(0.1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
main()
This works fine on Linux, but on OSX I'm having trouble because it can't seem to do a .read()
on the created cv2.VideoCapture(device)
object (stored in the var self._cap
).
After some searching I found this SO answer, which suggests to use Billiard, a replacement for pythons multiprocessing which supposedly has some very useful improvements. So at the top of the file I simply added the import after my previous multiprocessing import (effectively overriding multiprocessing.Process
):
from billiard import Process, forking_enable
and just before the instantiation of the video_process
variable I use forking_enable
as follows:
forking_enable(0) # Supposedly this is all I need for billiard to do it's magic
video_process = Process(target=stream, args=(frame, finished))
So in this version (here on pastebin) I then ran the file again, which gives me this error:
pickle.PicklingError: Can't pickle : it's not found as main.stream_function
A search for that error led me to an SO question with a long list of answers of which one gave me the suggestion to use the dill serialization lib to solve this. That lib however, should be used with the Pathos multiprocessing fork. So I simply tried changing my multiprocessing import line from
from multiprocessing import Array, Value, Process
to
from pathos.multiprocessing import Array, Value, Process
But none of Array
, Value
and Process
seem to exist in the pathos.multiprocessing
package.
And from this point I'm totally lost. I'm searching for things which I barely have enough knowledge about, and I don't even know in which direction I need to search or debug anymore.
So can any brighter soul than me help me to capture video in a separate process? All tips are welcome!
Your first problem was that you could not access the webcam in a forked
process. Several issue arises when external libraries are used with fork
as the fork operation do not clean all the file descriptors open by the parent process, leading to strange behavior. The library are often more robust to this kind of issue on linux but it is not a good idea to share an IO object such as cv2.VideoCapture
between the 2 process.
When you use billard.forking_enabled
and set it to False
, you ask the library not to use fork
to spawn new process but spawn
or forkserver
methods, which are cleaner as they close all the file descriptors but are also slower to start, This should not be an issue in your case. If you are using python3.4+
, you can do this using multiprocessing.set_start_method('forkserver')
for instance.
When you use one of these method, the target function and the arguments needs to be serialized to be passed to the child process. The serialization is done by default using pickle
, which have several flows, as the one you mentioned, not being able to serialized locally defined objects and also cv2.VideoCapture
. But you can simplify your program to make all the argument for your Process
picklelisable. Here is a tentative to solve your problem:
import numpy as np
import time
import ctypes
from multiprocessing import set_start_method
from multiprocessing import Process, Array, Value
import cv2
class VideoCapture:
"""
Class that handles video capture from device or video file
"""
def __init__(self, device=0, delay=0.):
"""
:param device: device index or video filename
:param delay: delay between frame captures in seconds(float allowed)
"""
self._delay = delay
self._device = device
self._cap = cv2.VideoCapture(device)
assert self._cap.isOpened()
def __getstate__(self):
self._cap.release()
return (self._delay, self._device)
def __setstate__(self, state):
self._delay, self._device = state
self._cap = cv2.VideoCapture(self._device)
assert self._cap.grab(), "The child could not grab the video capture"
def _proper_frame(self, delay=None):
"""
:param delay: delay between frames capture(in seconds)
:param finished: synchronized wrapper for int
:return: frame
"""
snapshot = None
correct_img = False
fail_counter = -1
while not correct_img:
# Capture the frame
correct_img, snapshot = self._cap.read()
fail_counter += 1
# Raise exception if there's no output from the device
if fail_counter > 10:
raise Exception("Capture: exceeded number of tries to capture "
"the frame.")
# Delay before we get a new frame
time.sleep(delay)
return snapshot
def get_size(self):
"""
:return: size of the captured image
"""
return (int(self._cap.get(int(cv2.CAP_PROP_FRAME_HEIGHT))),
int(self._cap.get(int(cv2.CAP_PROP_FRAME_WIDTH))), 3)
def release(self):
self._cap.release()
def stream(capturer, image, finished):
"""
Function keeps capturing frames until finished = 1
:param image: shared numpy array for multiprocessing
:param finished: synchronized wrapper for int
:return: nothing
"""
shape = capturer.get_size()
# Define shared variables
frame = np.ctypeslib.as_array(image.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
# Incorrect input array
if frame.shape != capturer.get_size():
raise Exception("Capture: improper size of the input image")
print("Capture: start streaming")
# Capture frame until we get finished flag set to True
while not finished.value:
frame[:, :, :] = capturer._proper_frame(capturer._delay)
# Release the device
capturer.release()
def main():
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
# Define shared variables
shared_array_base = Array(ctypes.c_uint8, shape[0] * shape[1] * shape[2])
frame = np.ctypeslib.as_array(shared_array_base.get_obj())
frame = frame.reshape(shape[0], shape[1], shape[2])
finished = Value('i', 0)
# Start processes which run in parallel
video_process = Process(target=stream,
args=(cap, shared_array_base, finished))
video_process.start() # Launch capture process
# Sleep for some time to allow videocapture start working first
time.sleep(2)
# Termination function
def terminate():
print("Main: termination")
finished.value = True
# Wait for all processes to finish
time.sleep(1)
# Terminate working processes
video_process.join()
# The capturing works until keyboard interrupt is pressed.
while True:
try:
# Display the resulting frame
cv2.imshow('frame', frame)
# Display it at least one ms before going to the next frame
time.sleep(0.1)
cv2.waitKey(1)
except KeyboardInterrupt:
cv2.destroyAllWindows()
terminate()
break
if __name__ == '__main__':
set_start_method("spawn")
main()
I could not test it on mac at the moment so it might not work out of the box but there should not be multiprocessing
related errors. Some notes:
cv2.VideoCapture
object in the new child and grab the camera as only one process should read from the camera.fork
are only due to the shared cv2.VideoCapture
and recreating it in the stream
function would solve your issue.mp.Array
buffer (this is really weird and it took me a while to figure out). You need to pass explicitly the Array
and recreate a wrapper. Maybe the issue in your first program with fork
are only due to the shared cv2.VideoCapture
and recreating it in the stream
function would solve your issue.
I assumed you were running your code in python3.4+
so I did not used billard
but using forking_enabled(False)
instead of set_start_method
should be kind of similar.
Let me know if this work!
The main challenge with multiprocessing
is understanding the memory model in case of separated memory address spaces.
Python makes the things even more confusing as it abstract many of these aspects hiding several mechanisms under few innocent looking APIs.
When you write this logic:
# Initialize VideoCapture object and auxilary objects
cap = VideoCapture()
shape = cap.get_size()
stream = cap.get_stream_function()
...
# Start processes which run in parallel
video_process = Process(target=stream, args=(frame, finished))
video_process.start() # Launch capture process
You are passing to the Process
stream_function
which is referring to internal components of the VideoCapture
class (self.get_size
) but, more importantly, which is not available as top level function.
The child process won't be able to re-construct the required object as what it receives it's just the name of a function. It tries to look it up in the main module hence the message:
pickle.PicklingError: Can't pickle : it's not found as main.stream_function
The child process is trying to resolve the function in the main module as main.stream_function
and the lookup fails.
My first suggestion would be to change your logic so that you are passing to the child process the method returning stream_function
.
video_process = Process(target=cap.get_stream_function, args=(...))
Yet you might still encounter problems as you are sharing state between the two processes.
What I usually suggest to people when they approach multiprocessing paradigms in Python is to think about processes as if they were running in separated machines. In these cases it would be definitely obvious that your architecture is problematic.
I would recommend you to separate the responsibilities of the two processes making sure that one process (the child) is dealing with the entire capturing of the video and the other (the parent or a third process) is dealing with the processing of the frames.
This paradigm is known as the Producer and Consumer Problem and it's very well suited to your system. The video capturing process would be the producer and the other one the consumer. A simple multiprocessing.Pipe
or multiprocessing.Queue
would make sure the frames are transferred from the producer to the consumer as soon as they are ready.
Adding an example in pseudo-code as I don't know the video capturing APIs. The point is dealing with the whole video capturing logic in the producer process abstracting it from the consumer. Only things the consumer needs to know is that it receives a frame object via a pipe.
def capture_video(writer):
"""This runs in the producer process."""
# The VideoCapture class wraps the video acquisition logic
cap = VideoCapture()
while True:
frame = cap.get_next_frame() # the method returns the next frame
writer.send(frame) # send the new frame to the consumer process
def main():
reader, writer = multiprocessing.Pipe(False)
# producer process
video_process = Process(target=capture_video, args=[writer])
video_process.start() # Launch capture process
while True:
try:
frame = reader.recv() # receive next frame from the producer
process_frame(frame)
except KeyboardInterrupt:
video_process.terminate()
break
Note how there's no shared state between the processes (no need to share any array). The communication goes through Pipes and is unidirectional making the logic very simple. As I said above, this logic would work also across different machines. You would just need to replace the Pipe with a socket.
You might want a cleaner termination approach for the producer process. I would suggest you to use a multiprocessing.Event
. Just set it from the parent in the KeyboardInterrupt
block and check its status in the child at every iteration (while not event.is_set()
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With