I'm trying to do the following: receive video stream using gstreamer and process it with opencv. I've found few solutions, and one of them is to write video into (from gstreamer) fifo and then read it using opencv. (OPTION3 here MJPEG streaming and decoding). The problem is I cant open pipe. cvCreateFileCapture just never returns. Here is a part code I wrote:
if(mkfifo("fifo.avi", S_IRUSR| S_IWUSR) == -1)
{
cout<<"Cant create fifo"<<endl;
cout<<errno<<endl;
}
loop = g_main_loop_new(NULL, false);
fsink = gst_element_factory_make("filesink", "fsink");
g_object_set(G_OBJECT(fsink), "location", "fifo.avi", NULL);
playbin = gst_element_factory_make("playbin2", "play");
g_object_set(G_OBJECT(playbin), "uri", uri.c_str(), NULL);
g_object_set(G_OBJECT(playbin), "video-sink", fsink, NULL);
bus = gst_pipeline_get_bus(GST_PIPELINE(playbin));
gst_bus_add_signal_watch(bus);
g_signal_connect(bus, "message::buffering", G_CALLBACK(&set_playbin_state), playbin);
gst_object_unref(bus);
cvNamedWindow("output", CV_WINDOW_AUTOSIZE);
capture = cvCreateFileCapture("fifo.avi");
The program stacks in the last line. PS: I'm using opencv 2.3.1.
Here is my complete source code solution for Gstreamer 1.4.0 and OpenCV 2.4.9.
It uses gst_parse_launch()
to parse normal command line that you would give to gst-launch
. Gstreamer pipeline converts frames to RGB888 format before feeding them to OpenCV so that conversion is as easy as possible.
OpenCV frame processing is not done in new_sample()
callback but it only grabs the frame from gstreamer and push it to queue which then will be consumed in main thread. This way we mat call e.g. imshow() from OpenCV to actually render image to screen.
~150 lines... by removing debug prints etc. could be reduced to <100 lines of code.
Probably one should add there thread synchronization around deque read / write
#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <stdlib.h>
#include "opencv2/opencv.hpp"
using namespace cv;
// TODO: use synchronized deque
std::deque<Mat> frameQueue;
GstFlowReturn
new_preroll(GstAppSink *appsink, gpointer data) {
g_print ("Got preroll!\n");
return GST_FLOW_OK;
}
GstFlowReturn
new_sample(GstAppSink *appsink, gpointer data) {
static int framecount = 0;
framecount++;
GstSample *sample = gst_app_sink_pull_sample(appsink);
GstCaps *caps = gst_sample_get_caps(sample);
GstBuffer *buffer = gst_sample_get_buffer(sample);
const GstStructure *info = gst_sample_get_info(sample);
// ---- Read frame and convert to opencv format ---------------
GstMapInfo map;
gst_buffer_map (buffer, &map, GST_MAP_READ);
// convert gstreamer data to OpenCV Mat, you could actually
// resolve height / width from caps...
Mat frame(Size(320, 240), CV_8UC3, (char*)map.data, Mat::AUTO_STEP);
int frameSize = map.size;
// TODO: synchronize this....
frameQueue.push_back(frame);
gst_buffer_unmap(buffer, &map);
// ------------------------------------------------------------
// print dot every 30 frames
if (framecount%30 == 0) {
g_print (".");
}
// show caps on first frame
if (framecount == 1) {
g_print ("%s\n", gst_caps_to_string(caps));
}
gst_sample_unref (sample);
return GST_FLOW_OK;
}
static gboolean
my_bus_callback (GstBus *bus, GstMessage *message, gpointer data) {
g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_ERROR: {
GError *err;
gchar *debug;
gst_message_parse_error (message, &err, &debug);
g_print ("Error: %s\n", err->message);
g_error_free (err);
g_free (debug);
break;
}
case GST_MESSAGE_EOS:
/* end-of-stream */
break;
default:
/* unhandled message */
break;
}
/* we want to be notified again the next time there is a message
* on the bus, so returning TRUE (FALSE means we want to stop watching
* for messages on the bus and our callback should not be called again)
*/
return TRUE;
}
int
main (int argc, char *argv[])
{
GError *error = NULL;
gst_init (&argc, &argv);
gchar *descr = g_strdup(
"videotestsrc pattern=ball ! "
"video/x-raw,format=RGB ! "
"videoconvert ! "
"appsink name=sink sync=true"
);
GstElement *pipeline = gst_parse_launch (descr, &error);
if (error != NULL) {
g_print ("could not construct pipeline: %s\n", error->message);
g_error_free (error);
exit (-1);
}
/* get sink */
GstElement *sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");
gst_app_sink_set_emit_signals((GstAppSink*)sink, true);
gst_app_sink_set_drop((GstAppSink*)sink, true);
gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);
GstAppSinkCallbacks callbacks = { NULL, new_preroll, new_sample };
gst_app_sink_set_callbacks (GST_APP_SINK(sink), &callbacks, NULL, NULL);
GstBus *bus;
guint bus_watch_id;
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, my_bus_callback, NULL);
gst_object_unref (bus);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
namedWindow("edges",1);
while(1) {
g_main_iteration(false);
// TODO: synchronize...
if (frameQueue.size() > 0) {
// this lags pretty badly even when grabbing frames from webcam
Mat frame = frameQueue.front();
Mat edges;
cvtColor(frame, edges, CV_RGB2GRAY);
GaussianBlur(edges, edges, Size(7,7), 1.5, 1.5);
Canny(edges, edges, 0, 30, 3);
imshow("edges", edges);
cv::waitKey(30);
frameQueue.clear();
}
}
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
gst_object_unref (GST_OBJECT (pipeline));
return 0;
}
To compile on OSX / Linux make file like this should do:
GST_FLAGS=$(shell pkg-config --cflags --libs gstreamer-gl-1.0 gstreamer-tag-1.0 gstreamer-net-1.0 gstreamer-sdp-1.0 \
gstreamer-1.0 gstreamer-allocators-1.0 gstreamer-insertbin-1.0 gstreamer-plugins-base-1.0 \
gstreamer-codecparsers-1.0 gstreamer-base-1.0 gstreamer-app-1.0 gstreamer-check-1.0 \
gstreamer-controller-1.0 gstreamer-video-1.0 gstreamer-fft-1.0 gstreamer-mpegts-1.0 \
gstreamer-pbutils-1.0 gstreamer-rtp-1.0 gstreamer-rtsp-1.0 \
gstreamer-riff-1.0 gstreamer-audio-1.0 gstreamer-plugins-bad-1.0 opencv)
OPENCV_FLAGS=$(shell pkg-config --cflags --libs opencv)
all: gst_opencv
gst_opencv: gst_opencv
g++ $(GST_FLAGS) $(OPENCV_FLAGS) gst_opencv -o gst_opencv
clean:
rm -f gst_opencv
So. After searching a while, I've found a solution, which involves retrieving data from buffer. So the idea is to create playbin and set appsink as "video-sink". Here is code sample:
cout<<"Creating appsink"<<endl;
appsink = gst_element_factory_make("appsink", "asink");
gst_app_sink_set_emit_signals((GstAppSink*)appsink, true);
gst_app_sink_set_drop((GstAppSink*)appsink, true);
gst_app_sink_set_max_buffers((GstAppSink*)appsink, 1);
//creating and initialising pipeline
g_object_set(G_OBJECT(playbin), "video-sink", appsink, NULL);
g_signal_connect(appsink, "new-buffer", G_CALLBACK(DisplayFrame), (gpointer) mark);
//callback function looks like this
gboolean Core::DisplayFrame(GstAppSink *fks, gpointer mark)
{
static bool init = false;
static IplImage *frame;
GstBuffer* buf;
Mark* mk = (Mark*) mark;
if(!init)
{
init = true;
frame = cvCreateImage(cvSize(mk->frame_w, mk->frame_h), IPL_DEPTH_8U, 1);
}
buf = gst_app_sink_pull_buffer(fks);
frame->imageData = (char*)GST_BUFFER_DATA(buf);
ProcessFrame(frame);
gst_buffer_unref(buf);
return true;
}
this works. PS. There's a lot of info about this method, but I spent a lot of time searching for it. So I decided to post it here in order to provide at least some keywords to search.
UPDATE. And a bit more information about connecting gstreamer and opencv. It's about converting buffer to iplimage now. First of all, we need to receive rgb buffer, to make conversion as easy, as possible. In order to do this we'll replace appsinks with appsink, connected to ffmpegcolorspace
cout<<"Creating appsink"<<endl;
appsink = gst_element_factory_make("appsink", "asink");
gst_app_sink_set_emit_signals((GstAppSink*)appsink, true);
gst_app_sink_set_drop((GstAppSink*)appsink, true);
gst_app_sink_set_max_buffers((GstAppSink*)appsink, 1);
csp = gst_element_factory_make("ffmpegcolorspace", "csp");
sinkpipe = gst_pipeline_new("sinkp");
gst_bin_add_many(GST_BIN(sinkpipe), csp, appsink, NULL);
gst_element_link_filtered(csp, appsink, gst_caps_new_simple("video/x-raw-rgb", NULL));
pad = gst_element_get_static_pad(csp, "sink");
gst_element_add_pad(sinkpipe, gst_ghost_pad_new("ghost", pad));
g_object_unref(pad);
//...
g_object_set(G_OBJECT(playbin), "video-sink", sinkpipe, NULL);
//...
g_signal_connect(appsink, "new-buffer", G_CALLBACK(GetFrame), (gpointer) mark);
//...
//caps_struct can be retrieved via writing data probe
//search for it in streamer manual
cout<<"Getting frame resolution"<<endl;
gst_structure_get_int(caps_struct, "width", &(mark->frame_w));
gst_structure_get_int(caps_struct, "height", &(mark->frame_h));
gst_structure_get_int(caps_struct, "depth", &depth);
mark->GeneratePoints();
frame = cvCreateImage(cvSize(mark->frame_w, mark->frame_h), depth/3, 3);
//callback function
gboolean Core::GetFrame(GstAppSink *fks, gpointer frame)
{
IplImage* frame_temp = frame
IplImage* frame_temp_two = cvCloneImage(frame_temp);
GstBuffer* buf;
buf = gst_app_sink_pull_buffer(fks);
frame_temp_two->imageData = (char*) GST_BUFFER_DATA(buf);
cvConvertImage(frame_temp_two, frame_temp, CV_CVTIMG_SWAP_RB);
ProcessFrame(frame_temp);
gst_buffer_unref(buf);
return true;
}
I hope this will help somebody.
I can't comment Mikael post (no enough points for that). I did some modifications and corrections in the program, you can check it here on in my gist. https://gist.github.com/patrickelectric/5dca1cb7cef4ffa7fbb6fb70dd9f9edc
/**
* Based on:
* https://stackoverflow.com/questions/10403588/adding-opencv-processing-to-gstreamer-application
*/
// Include atomic std library
#include <atomic>
// Include gstreamer library
#include <gst/gst.h>
#include <gst/app/app.h>
// Include OpenCV library
#include <opencv.hpp>
// Share frame between main loop and gstreamer callback
std::atomic<cv::Mat*> atomicFrame;
/**
* @brief Check preroll to get a new frame using callback
* https://gstreamer.freedesktop.org/documentation/design/preroll.html
* @return GstFlowReturn
*/
GstFlowReturn new_preroll(GstAppSink* /*appsink*/, gpointer /*data*/)
{
return GST_FLOW_OK;
}
/**
* @brief This is a callback that get a new frame when a preroll exist
*
* @param appsink
* @return GstFlowReturn
*/
GstFlowReturn new_sample(GstAppSink *appsink, gpointer /*data*/)
{
static int framecount = 0;
// Get caps and frame
GstSample *sample = gst_app_sink_pull_sample(appsink);
GstCaps *caps = gst_sample_get_caps(sample);
GstBuffer *buffer = gst_sample_get_buffer(sample);
GstStructure *structure = gst_caps_get_structure(caps, 0);
const int width = g_value_get_int(gst_structure_get_value(structure, "width"));
const int height = g_value_get_int(gst_structure_get_value(structure, "height"));
// Print dot every 30 frames
if(!(framecount%30)) {
g_print(".");
}
// Show caps on first frame
if(!framecount) {
g_print("caps: %s\n", gst_caps_to_string(caps));
}
framecount++;
// Get frame data
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_READ);
// Convert gstreamer data to OpenCV Mat
cv::Mat* prevFrame;
prevFrame = atomicFrame.exchange(new cv::Mat(cv::Size(width, height), CV_8UC3, (char*)map.data, cv::Mat::AUTO_STEP));
if(prevFrame) {
delete prevFrame;
}
gst_buffer_unmap(buffer, &map);
gst_sample_unref(sample);
return GST_FLOW_OK;
}
/**
* @brief Bus callback
* Print important messages
*
* @param bus
* @param message
* @param data
* @return gboolean
*/
static gboolean my_bus_callback(GstBus *bus, GstMessage *message, gpointer data)
{
// Debug message
//g_print("Got %s message\n", GST_MESSAGE_TYPE_NAME(message));
switch(GST_MESSAGE_TYPE(message)) {
case GST_MESSAGE_ERROR: {
GError *err;
gchar *debug;
gst_message_parse_error(message, &err, &debug);
g_print("Error: %s\n", err->message);
g_error_free(err);
g_free(debug);
break;
}
case GST_MESSAGE_EOS:
/* end-of-stream */
break;
default:
/* unhandled message */
break;
}
/* we want to be notified again the next time there is a message
* on the bus, so returning TRUE (FALSE means we want to stop watching
* for messages on the bus and our callback should not be called again)
*/
return true;
}
int main(int argc, char *argv[]) {
gst_init(&argc, &argv);
gchar *descr = g_strdup(
"udpsrc port=5600 "
"! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264 "
"! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert "
"! appsink name=sink emit-signals=true sync=false max-buffers=1 drop=true"
);
// Check pipeline
GError *error = nullptr;
GstElement *pipeline = gst_parse_launch(descr, &error);
if(error) {
g_print("could not construct pipeline: %s\n", error->message);
g_error_free(error);
exit(-1);
}
// Get sink
GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "sink");
/**
* @brief Get sink signals and check for a preroll
* If preroll exists, we do have a new frame
*/
gst_app_sink_set_emit_signals((GstAppSink*)sink, true);
gst_app_sink_set_drop((GstAppSink*)sink, true);
gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);
GstAppSinkCallbacks callbacks = { nullptr, new_preroll, new_sample };
gst_app_sink_set_callbacks(GST_APP_SINK(sink), &callbacks, nullptr, nullptr);
// Declare bus
GstBus *bus;
bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
gst_bus_add_watch(bus, my_bus_callback, nullptr);
gst_object_unref(bus);
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_PLAYING);
// Main loop
while(1) {
g_main_iteration(false);
cv::Mat* frame = atomicFrame.load();
if(frame) {
cv::imshow("Frame", atomicFrame.load()[0]);
cv::waitKey(30);
}
}
gst_element_set_state(GST_ELEMENT(pipeline), GST_STATE_NULL);
gst_object_unref(GST_OBJECT(pipeline));
return 0;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With