Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass video stream from one Python to another?

In my previous post, we found a way to pass an image file from one Python to another: pass video data from one python script to another

I am now trying to pass a video (successive images):

write.py

import sys
import numpy as np
import cv2
from PIL import Image
import io
import time

while True:
    img = cv2.imread('cat.jpg')
    bimg = cv2.imencode('.jpg',img)[1]
    sys.stdout.buffer.write(bimg)
    sys.stdout.flush()
    time.sleep(1)

read.py:

import sys
from PIL import Image
import io
import cv2
import numpy as np
from io import BytesIO
    
while True:
    data = sys.stdin.buffer.read()
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8), cv2.IMREAD_UNCHANGED)
    cv2.imshow('image', img_np)
    cv2.waitKey(0)

If I output the write.py data to terminal, it prints. If I manually hand data to read.py that gets read. But put them together (python3 write.py | python3 read.py) and it just hangs. write.py just writes once, and read.py never seems to get it.

My guess is that the read code is waiting for the write code to "end" before it wraps up the data package and calls it an image. Though if that were the case, I would think it that doing a flush would fix it.

like image 270
Ivan Viti Avatar asked Feb 24 '21 15:02

Ivan Viti


3 Answers

You have mentioned that your image to send is not a consistent size, but I have to assume if it's coming from the same camera (for a given video stream) the raw image size does not change, rather just the compressed image size. I would imagine you likely have plenty of RAM to store at least one frame un-compressed in memory at a time, and you're just introducing processing overhead with all the compression and decompression.

Given that I would create a shared buffer using multiprocessing.shared_memory which can share frames between the two processes (you can even create a circular buffer of a couple frames if you wanna get real fancy, and prevent screen tearing, but it wasn't a big problem in my test)

Given that cv2.VideoCapture().read() can read straight into an existing array, and you can create a numpy array which uses the shared memory as it's buffer, you can read the data into the shared memory with zero extra copying. Using this I was able to read nearly 700 frames per second from a video file encoded with H.264 at 1280x688 resolution.

from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np

vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie

#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
    raise Exception("error reading from video")

#create a shared memory for sending the frame shape
frame_shape_shm = SharedMemory(name="frame_shape", create=True, size=frame.ndim*4) #4 bytes per dim as long as int32 is big enough
frame_shape = np.ndarray(3, buffer=frame_shape_shm.buf, dtype='i4')  #4 bytes per dim as long as int32 is big enough
frame_shape[:] = frame.shape

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)

input("writer is ready: press enter once reader is ready")

try: #use keyboardinterrupt to quit
    while True:
        cap.read(frame_buffer) #read data into frame buffer
        # sleep(1/24) #limit framerate-ish (hitting actual framerate is more complicated than 1 line)
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT, close this one first so the reader doesn't unlink() the 
#  shm's before this file has exited. (less important on windows)
cap.release()
frame_buffer_shm.close()
frame_shape_shm.close()

The reader process looks very similar, but instead of creating a video device, and reading frames, we just construct the shared array, and imshow a bunch. The GUI isn't quite as fast as just dumping the data, so we don't get quite 700 fps, but up to 500's isn't bad...

from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np

#create a shared memory for reading the frame shape
frame_shape_shm = SharedMemory(name="frame_shape")
frame_shape = np.ndarray([3], buffer=frame_shape_shm.buf, dtype='i4')

#create the shared memory for the frame buffer
frame_buffer_shm = SharedMemory(name="frame_buffer")

#create the framebuffer using the shm's memory
frame_buffer = np.ndarray(frame_shape, buffer=frame_buffer_shm.buf, dtype='u1')
try:
    while True:
        cv2.imshow('frame', frame_buffer)
        cv2.waitKey(1) #this is needed for cv2 to update the gui
except KeyboardInterrupt:
    pass

#cleanup: IMPORTANT the writer process should close before this one, so nothing 
#  tries to access the shm after unlink() is called. (less important on windows)
frame_buffer_shm.close()
frame_buffer_shm.unlink()
frame_shape_shm.close()
frame_shape_shm.unlink()

EDIT: the user's other questions suggested a version of python earlier than 3.8 may be a requirement (or even working across versions), so here's an example of using posix_ipc in-place of multiprocessing.shared_memory to create the frame buffer (and how to clean it up):

#creation
shm = posix_ipc.SharedMemory(name="frame_buf", 
                             flags=posix_ipc.O_CREX, #if this fails, cleanup didn't happen properly last time
                             size=frame.nbytes)
shm_map = mmap.mmap(shm.fd, shm.size)
buf = memoryview(shm_map)
#create the frame buffer
frame_buffer = np.ndarray(frame.shape, buffer=buf, dtype=frame.dtype)
frame_buffer[:] = frame[:] #copy first frame into frame buffer

#cleanup
shm.close_fd() #can happen after opening mmap
buf.release() #must happen after frame_buffer is no longer needed and before closing mmap
shm_map.close()
shm.unlink() #must only call from one of the two processes. unlink tells the os to reclaim the space once all handles are closed.
like image 107
Aaron Avatar answered Oct 26 '22 17:10

Aaron


I think I figured it out. In read.py, sys.stdin.buffer.read() reads and waits until the stdin pipe is closed but write.py never actually closes its stdout because of the while True loop. This proof of concept simplified example works:

write.py

import sys
import time

sys.stdout.buffer.write(b"Hello world")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

and read.py

import sys

with open("output.txt", "w") as file:
    file.write(sys.stdin.read())

This will also hang and will never actually write anything to "output.txt". If we remove the while True loop from write.py the code will no longer hang and "Hello World" will be written to "output.py" because when write.py is finished writing it will close its process and that will close the pipe. To fix this issue I recommend changing read.py to something like this:

import sys

while True:
    with open("output.txt", "a") as file:
        file.write(sys.stdin.read(1))

Solution:

write.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

msg = b"Hello world"

# Tell `reader.py` that it needs to read x number of bytes.
length = len(msg)
# We also need to tell `read.py` how many bytes it needs to read.
# This means that we have reached the same problem as before.
# To fix that issue we are always going to send the number of bytes but
# We are going to pad it with `0`s at the start.
# https://stackoverflow.com/a/339013/11106801
length = str(length).zfill(MAX_FILE_SIZE)
sys.stdout.buffer.write(length.encode())

sys.stdout.buffer.write(msg)
sys.stdout.buffer.flush()

# We also need to tell `read.py` that it was the last file that we send
# Sending `1` means that the file has ended
sys.stdout.buffer.write(b"1")
sys.stdout.buffer.flush()

# Note if we comment out the code bellow it works again
while True:
    # Keep this alive but don't have `while True:pass`
    # because my computer might crash :D
    time.sleep(10)

and read.py

import sys
import time

MAX_FILE_SIZE = 16 # bytes

while True:
    time.sleep(1) # Make sure `write.py` has sent the data
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(sys.stdin.buffer.read(MAX_FILE_SIZE))
    time.sleep(1) # Make sure `write.py` has sent the data

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    with open("output.txt", "wb") as file:
        file.write(sys.stdin.buffer.read(length))

    file_ended = sys.stdin.buffer.read(1)
    if file_ended == b"1":
        # File has ended
        break
    else:
        # We are going to start reading again for the next file:
        pass

Edit: The solution works like this:

  1. Send the size of the file
  2. Send the actual file data
  3. Send a byte that tell read.py if it should be expecting another file or not

For part 1, we just encode the length of the file as a string that is padded with 0s at the front. Note: Make sure that the MAX_FILE_SIZE is larger than the size of the largest file (large numbers will slightly decrease the performance). For part 3, if we send a "1" it will mean that there are no more files to be sent. Otherwise reader.py will wait and accept the next file. So write.py will become:

from math import log
import time
import sys
import cv2


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)


def write_file(buffer, data, last_file=False):
   # Tell `reader.py` that it needs to read x number of bytes.
   length = len(data)
   # We also need to tell `read.py` how many bytes it needs to read.
   # This means that we have reached the same problem as before.
   # To fix that issue we are always going to send the number of bytes but
   # We are going to pad it with `0`s at the start.
   # https://stackoverflow.com/a/339013/11106801
   length = str(length).zfill(MAX_FILE_SIZE)
   with open("output.txt", "w") as file:
      file.write(length)
   buffer.write(length.encode())

   # Write the actual data
   buffer.write(data)

   # We also need to tell `read.py` that it was the last file that we send
   # Sending `1` means that the file has ended
   buffer.write(str(int(last_file)).encode())
   buffer.flush()


while True:
    img = cv2.imread("img.jpg")
    bimg = cv2.imencode(".jpg", img)[1]
    # Call write_data
    write_file(sys.stdout.buffer, bimg, last_file=False)
    # time.sleep(1) # Don't need this

and read.py will become:

from io import BytesIO
from math import log
import numpy as np
import time
import cv2
import sys


MAX_FILE_SIZE = 62914560 # bytes
MAX_FILE_SIZE = int(log(MAX_FILE_SIZE, 2)+1)


def read(buffer, number_of_bytes):
    output = b""
    while len(output) < number_of_bytes:
        output += buffer.read(number_of_bytes - len(output))
    assert len(output) == number_of_bytes, "An error occured."
    return output


def read_file(buffer):
    # Read `MAX_FILE_SIZE` number of bytes and convert it to an int
    # So that we know the size of the file comming in
    length = int(read(buffer, MAX_FILE_SIZE))

    # Here you can switch to a different file every time `writer.py`
    # Sends a new file
    data = read(buffer, length)

    # Read a byte so that we know if it is the last file
    file_ended = read(buffer, 1)

    return data, (file_ended == b"1")


while True:
    print("Reading file")
    data, last_file = read_file(sys.stdin.buffer)
    img_np = cv2.imdecode(np.frombuffer(BytesIO(data).read(), np.uint8),
                          cv2.IMREAD_UNCHANGED)
    cv2.imshow("image", img_np)
    cv2.waitKey(0)

    if last_file:
        break;
like image 33
TheLizzard Avatar answered Oct 26 '22 15:10

TheLizzard


Two Solutions: ZeroMQ | DiskCache

It is quite easy to send frames from one python file to another using ZeroMQ.

ZeroMQ


Install via PyPI: pip install -U pyzmq. There are multiple way to send frames. This is an example of using PUBLISHER and SUBSCRIBER

# writer | publisher
import base64
import time
import zmq
import cv2


# Prepare our context and publisher
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5563")

CAM_INDEX_OR_URI = 0
capture = cv2.VideoCapture(CAM_INDEX_OR_URI)
assert capture.isOpened(), "Cannot open camera"

while True:
    # Write two messages, each with an envelope and content

    # capture frame-by-frame
    ret, frame = capture.read()
    if not ret:
        print("[+] No frame received. Stream ended.")
        break

    # resize the frame
    frame = cv2.resize(frame, (640, 480))
    encoded, buffer = cv2.imencode(".jpg", frame)

    #  all is good
    # cv2.imshow("Frames", frame)

    # stop with Esc key (27)
    if cv2.waitKey(1) == 27:
        break

    sent_frame = base64.b64encode(buffer)
    publisher.send_multipart([b"camera_A", sent_frame])

    time.sleep(0.01)
  

# We never get here but clean up anyhow
publisher.close()
context.term()

capture.release()
cv2.destroyAllWindows()
# reader.py | subscriber

import numpy as np
import base64
import zmq
import cv2

# Prepare our context and publisher
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5563")
subscriber.setsockopt_string(zmq.SUBSCRIBE, "camera_A")

while True:
    # Read envelope with address
    [address, contents] = subscriber.recv_multipart()

    receive_frame = base64.b64decode(contents)
    frame = np.frombuffer(receive_frame, dtype=np.uint8)
    frame = cv2.imdecode(frame, 1)

 
    cv2.namedWindow("Frames", cv2.WINDOW_NORMAL)
    cv2.imshow("Frames", frame)
   

    # stop with Esc key (27) 
    if cv2.waitKey(1) == 27:
        break

subscriber.close()
context.term()
cv2.destroyAllWindows()

DiskCache


You could also consider using diskcache. It allows passing python objects through memory. It is like Redis but all Python and does not require a server. NB: pip install --upgrade diskcache. You can tweak to start sending live frames from camera | video

# writer.py
import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to push data to {tmp}.")
    while True:
        img = cv2.imread("cat.jpg")
        cache.push(img, expire=5)
        time.sleep(10)


# reader.py

import time
from pathlib import Path
import diskcache as dc
import cv2

tmp = Path("/tmp/stream")

with dc.Cache(tmp) as cache:
    print(f"[+] Ready to pull data from {tmp}")
    while True:
        (key, value), _ = cache.pull(expire_time=True)
        if key:
            cv2.imshow("cat", value)
            cv2.waitKey(0)
            cv2.destroyAllWindows()
        time.sleep(0.1)

I will go in these directions and not sys because you have total control over stream data. See diskcache Documentation

like image 42
Prayson W. Daniel Avatar answered Oct 26 '22 17:10

Prayson W. Daniel