In my previous post, we found a way to pass an image file from one Python to another: pass video data from one python script to another
I am now trying to pass a video (successive images):
write.py
import sys
import numpy as np
import cv2
from PIL import Image
import io
import time
while True:
img = cv2.imread('cat.jpg')
bimg = cv2.imencode('.jpg',img)[1]
sys.stdout.buffer.write(bimg)
sys.stdout.flush()
time.sleep(1)
read.py:
import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
while True:
data = sys.stdin.buffer.read()
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
cv2.imshow('image', img_np)
cv2.waitKey(0)
If I output the write.py data to terminal, it prints. If I manually hand data to read.py that gets read. But put them together (python3 write.py | python3 read.py
) and it just hangs. write.py just writes once, and read.py never seems to get it.
My guess is that the read code is waiting for the write code to "end" before it wraps up the data package and calls it an image. Though if that were the case, I would think it that doing a flush would fix it.
You have mentioned that your image to send is not a consistent size, but I have to assume if it's coming from the same camera (for a given video stream) the raw image size does not change, rather just the compressed image size. I would imagine you likely have plenty of RAM to store at least one frame un-compressed in memory at a time, and you're just introducing processing overhead with all the compression and decompression.
Given that I would create a shared buffer using multiprocessing.shared_memory
which can share frames between the two processes (you can even create a circular buffer of a couple frames if you wanna get real fancy, and prevent screen tearing, but it wasn't a big problem in my test)
Given that cv2.VideoCapture().read()
can read straight into an existing array, and you can create a numpy array which uses the shared memory as it's buffer, you can read the data into the shared memory with zero extra copying. Using this I was able to read nearly 700 frames per second from a video file encoded with H.264 at 1280x688 resolution.
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4') #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
input("writer is ready: press enter once reader is ready")
try: #use keyboardinterrupt to quit
while True:
cap.read(frame_buffer) #read data into frame buffer
# sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the
# shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()
The reader process looks very similar, but instead of creating a video device, and read
ing frames, we just construct the shared array, and imshow
a bunch. The GUI isn't quite as fast as just dumping the data, so we don't get quite 700 fps, but up to 500's isn't bad...
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')
#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")
#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
while True:
cv2.imshow('frame', frame_buffer)
cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
pass
#cleanup: IMPORTANT the writer process should close before this one, so nothing
# tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()
EDIT: the user's other questions suggested a version of python earlier than 3.8 may be a requirement (or even working across versions), so here's an example of using posix_ipc
in-place of multiprocessing.shared_memory
to create the frame buffer (and how to clean it up):
#creation
shm = posix_ipc.SharedMemory(name="frame_buf",
flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer
#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
I think I figured it out. In read.py
, sys.stdin.buffer.read()
reads and waits until the stdin
pipe is closed but write.py
never actually closes its stdout
because of the while True
loop. This proof of concept simplified example works:
write.py
import sys
import time
sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
and read.py
import sys
with open("output.txt", "w") as file:
file.write(sys.stdin.read())
This will also hang and will never actually write anything to "output.txt"
. If we remove the while True
loop from write.py
the code will no longer hang and "Hello World"
will be written to "output.py"
because when write.py
is finished writing it will close its process and that will close the pipe. To fix this issue I recommend changing read.py
to something like this:
import sys
while True:
with open("output.txt", "a") as file:
file.write(sys.stdin.read(1))
Solution:
write.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
msg = b"Hello world"
# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())
sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()
# Note if we comment out the code bellow it works again
while True:
# Keep this alive but don't have `while True:pass`
# because my computer might crash :D
time.sleep(10)
and read.py
import sys
import time
MAX_FILE_SIZE = 16 # bytes
while True:
time.sleep(1) # Make sure `write.py` has sent the data
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
time.sleep(1) # Make sure `write.py` has sent the data
# Here you can switch to a different file every time `writer.py`
# Sends a new file
with open("output.txt", "wb") as file:
file.write(sys.stdin.buffer.read(length))
file_ended = sys.stdin.buffer.read(1)
if file_ended == b"1":
# File has ended
break
else:
# We are going to start reading again for the next file:
pass
Edit: The solution works like this:
read.py
if it should be expecting another file or notFor part 1, we just encode the length of the file as a string that is padded with 0s at the front. Note: Make sure that the MAX_FILE_SIZE
is larger than the size of the largest file (large numbers will slightly decrease the performance). For part 3, if we send a "1"
it will mean that there are no more files to be sent. Otherwise reader.py
will wait and accept the next file. So write.py
will become:
from math import log
import time
import sys
import cv2
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def write_file(buffer, data, last_file=False):
# Tell `reader.py` that it needs to read x number of bytes.
length = len(data)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
with open("output.txt", "w") as file:
file.write(length)
buffer.write(length.encode())
# Write the actual data
buffer.write(data)
# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
buffer.write(str(int(last_file)).encode())
buffer.flush()
while True:
img = cv2.imread("img.jpg")
bimg = cv2.imencode(".jpg", img)[1]
# Call write_data
write_file(sys.stdout.buffer, bimg, last_file=False)
# time.sleep(1) # Don't need this
and read.py
will become:
from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys
MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)
def read(buffer, number_of_bytes):
output = b""
while len(output) < number_of_bytes:
output += buffer.read(number_of_bytes - len(output))
assert len(output) == number_of_bytes, "An error occured."
return output
def read_file(buffer):
# Read `MAX_FILE_SIZE` number of bytes and convert it to an int
# So that we know the size of the file comming in
length = int(read(buffer, MAX_FILE_SIZE))
# Here you can switch to a different file every time `writer.py`
# Sends a new file
data = read(buffer, length)
# Read a byte so that we know if it is the last file
file_ended = read(buffer, 1)
return data, (file_ended == b"1")
while True:
print("Reading file")
data, last_file = read_file(sys.stdin.buffer)
img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
cv2.IMREAD_UNCHANGED)
cv2.imshow("image", img_np)
cv2.waitKey(0)
if last_file:
break;
It is quite easy to send frames from one python file to another using ZeroMQ.
ZeroMQ
Install via PyPI: pip install -U pyzmq
. There are multiple way to send frames.
This is an example of using PUBLISHER and SUBSCRIBER
# writer | publisher
import base64
import time
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")
CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(), "Cannot open camera"
while True:
# Write two messages, each with an envelope and content
# capture frame-by-frame
ret, frame = capture.read()
if not ret:
print("[+] No frame received. Stream ended.")
break
# resize the frame
frame = cv2.resize(frame, (640, 480))
encoded, buffer = cv2.imencode(".jpg", frame)
# all is good
# cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
sent_frame = base64.b64encode(buffer)
publisher.send_multipart([b"camera_A", sent_frame])
time.sleep(0.01)
# We never get here but clean up anyhow
publisher.close()
context.term()
capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber
import numpy as np
import base64
import zmq
import cv2
# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "camera_A")
while True:
# Read envelope with address
[address, contents] = subscriber.recv_multipart()
receive_frame = base64.b64decode(contents)
frame = np.frombuffer(receive_frame, dtype=np.uint8)
frame = cv2.imdecode(frame, 1)
cv2.namedWindow("Frames", cv2.WINDOW_NORMAL)
cv2.imshow("Frames", frame)
# stop with Esc key (27)
if cv2.waitKey(1) == 27:
break
subscriber.close()
context.term()
cv2.destroyAllWindows()
You could also consider using diskcache
. It allows passing python objects through memory. It is like Redis but all Python and does not require a server. NB: pip install --upgrade diskcache
. You can tweak to start sending live frames from camera | video
# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to push data to {tmp}.")
while True:
img = cv2.imread("cat.jpg")
cache.push(img, expire=5)
time.sleep(10)
# reader.py
import time
from pathlib import Path
import diskcache as dc
import cv2
tmp = Path("/tmp/stream")
with dc.Cache(tmp) as cache:
print(f"[+] Ready to pull data from {tmp}")
while True:
(key, value), _ = cache.pull(expire_time=True)
if key:
cv2.imshow("cat", value)
cv2.waitKey(0)
cv2.destroyAllWindows()
time.sleep(0.1)
I will go in these directions and not sys
because you have total control over stream data. See diskcache Documentation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With