Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Gstreamer change source element dynamically

Tags:

c++

gstreamer

I have a GStreamer pipeline that pulls video from a rtspsrc element. The rtspsrc element connects to a rtpjpegdepay element. I'd like to be able to change the RTSP URL on the fly. So far what I've been doing is:

1) unlinking the rtspsrc from the depay element

2) creating a new source element with the new RTSP URL

3) and linking to the depay element.

The problem that I am having is that the new RTSP source element is not linking correctly to the depay element, causing segfaults. I would like some help in figuring out how to change rtspsrc URL dynamically (while pipeline is still playing).

Pipeline creation:

GstBus *bus;
guint busWatchId;
GstElement *src, *depay, *parser, *decoder, *vpe, *filter, *sink;
GstCaps *vpeCaps;

m_loop = g_main_loop_new(NULL, FALSE);

//create pipeline elements
m_cameraStream = gst_pipeline_new("display_pipeline");
src = gst_element_factory_make("rtspsrc", "rtspsrc");
depay = gst_element_factory_make("rtpjpegdepay", "depay");
parser = gst_element_factory_make("jpegparse", NULL);
decoder = gst_element_factory_make("ducatijpegdec", NULL);
vpe = gst_element_factory_make("vpe", NULL);
filter = gst_element_factory_make("capsfilter", NULL);
sink = gst_element_factory_make("waylandsink", NULL);

if(!(m_cameraStream || src || depay || parser || decoder || vpe || filter || sink)){
    qFatal("could not create pipeline elements");
    exit(1);
}

g_object_set(G_OBJECT(src), "location", "rtsp://192.168.50.29/av0_1", "latency", 0, NULL);
g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay);

//add src caps?
vpeCaps = gst_caps_from_string("video/x-raw, format=NV12, width=800, height=480");  //change this when Tomas' patch hits
if(!vpeCaps){
    qFatal("cannot create caps");
    exit(1);
}

g_object_set(G_OBJECT(filter), "caps", vpeCaps, NULL);
g_object_set(G_OBJECT(sink), "sync", false, NULL);

//add and link elements to create full pipeline
gst_bin_add_many(GST_BIN(m_cameraStream), src, depay, parser, decoder, vpe, sink, NULL);
if(!gst_element_link_many(depay, parser, decoder, vpe, sink, NULL)){
    qFatal("cannot link elements");
    exit(1);
}

gst_caps_unref(vpeCaps);

bus = gst_pipeline_get_bus(GST_PIPELINE(m_cameraStream));
busWatchId = gst_bus_add_watch(bus, GstBusFunc(bus_call), m_loop);
gst_object_unref(bus);

rtsp->depay linking callback function:

gchar *name;
GstElement *depay;
GstCaps *caps;

qDebug("on_rtsp_pad_added");
caps = gst_caps_from_string("application/x-rtp");
name = gst_pad_get_name(pad);
qDebug("on_rtsp_pad_added, rtspsrc pad name: %s", name);
depay = GST_ELEMENT(data);
if(!gst_element_link_pads_filtered(element, name, depay, "sink", caps)){
    qFatal("pad_added: failed to link elements");
}
g_free(name);
gst_element_set_state(m_cameraStream, GST_STATE_PLAYING);
g_main_loop_run(m_loop);

Source change function:

qDebug("slot_changeSource");
//gst_element_set_state(m_cameraStream, GST_STATE_PAUSED); //GST_STATE_NULL: segfault in pad_added
                                                         //GST_STATE_PAUSED: pauses, never returns to playing or on_rtsp_pad_added
                                                         //GST_STATE_PLAYING(left playing): same as NULL
GstElement* rtspsrc = gst_bin_get_by_name(GST_BIN(m_cameraStream), "rtspsrc");
if(rtspsrc){
    qDebug("rtspsrc found");
    GstElement* depay = gst_bin_get_by_name(GST_BIN(m_cameraStream), "depay");
    if(depay){
        qDebug("depay found");
        gst_element_unlink(rtspsrc, depay);
        gst_bin_remove(GST_BIN(m_cameraStream), rtspsrc);
        GstElement* newSource = gst_element_factory_make("rtspsrc", "rtspsrc");
        g_object_set(G_OBJECT(newSource), "location", "rtsp://192.168.50.29/av0_1", "latency", 0, NULL);
        g_signal_connect(newSource, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay); //needed in the same way as the previous rtspsrc
        gst_bin_add(GST_BIN(m_cameraStream), newSource);
        gst_element_sync_state_with_parent(newSource);
        //gst_element_set_state(m_cameraStream, GST_STATE_PLAYING);
    }
    gst_element_set_state(rtspsrc, GST_STATE_NULL);
    gst_object_unref(rtspsrc);
}

Other things that I have tried:

1) Probing the src pad of the rtsp element, to ensure that there is not any data in the element. This seemed like a bad idea as the rtsp element would have been freshly created at this point.

2) Setting the pipeline to PAUSED or NULL, then changing the source element. This results in the pipeline being eternally paused.

References:

Gstreamer mailing list

Documentation

like image 395
T. Wallis Avatar asked May 18 '18 18:05

T. Wallis


2 Answers

Okay, so I believe I have found an answer, and I'm going to post this here to save whomever stumbles upon this some time.

The answer is to create a pair of pad probes to handle the clearing of data from the pipeline. I did this by creating two pad probe callbacks: One to catch the pipeline to begin the flush process, and another to handle the recreation of the rtspsrc element once the pipeline is flushed. The first pad probe can be put anywhere, so I put it on my depay element. The second pad probe must be on the source of the last data processing element. So not the final sink element. For the pipeline above, this is the 'vpe' element.

I do this by passing an End of Stream (EOS) signal to the depay element, then have a pad probe callback at the src pad of the vpe element to catch the EOS as it exits the VPE. If the EOS gets to the waylandsink, the pipeline will simply close, and you'll have to restart the whole thing.

vpe = gst_bin_get_by_name(GST_BIN(data), "vpe");
srcPad = gst_element_get_static_pad(vpe, "src");
gst_pad_add_probe(srcPad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, event_probe, data, NULL);

//push EOS into the element, wait for the EOS to appear on the srcpad
depay = gst_bin_get_by_name(GST_BIN(data), "depay");
sinkPad = gst_element_get_static_pad(depay, "sink");
gst_pad_send_event(sinkPad, gst_event_new_eos());    

return GST_PAD_PROBE_OK;

And the callback for handling that EOS:

static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info, gpointer data){
    GstElement *rtspsrcOld, *rtspsrcNew, *depay;

    qDebug("event_probe");
    if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) != GST_EVENT_EOS){
        return GST_PAD_PROBE_PASS;
    }

    gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));

    rtspsrcOld = gst_bin_get_by_name(GST_BIN(data), "rtspsrc");
    if(rtspsrcOld){
        qDebug("found rtspsrcOld");
        depay = gst_bin_get_by_name(GST_BIN(data), "depay");
        gst_element_unlink(rtspsrcOld, depay);
        gst_bin_remove(GST_BIN(data), rtspsrcOld); //remove old rtspsrc from pipeline, should unlink from depay automatically.
        rtspsrcNew = gst_element_factory_make("rtspsrc", "rtspsrc");
        g_object_set(rtspsrcNew, "location", NEW_URI, "latency", 0, NULL);
        g_signal_connect(G_OBJECT(rtspsrcNew), "pad-added", G_CALLBACK(on_rtsp_pad_added), data);

        gst_bin_add(GST_BIN(data), rtspsrcNew);
        gst_element_set_state(GST_ELEMENT(data), GST_STATE_PLAYING);

        return GST_PAD_PROBE_DROP;
    }
    return GST_PAD_PROBE_DROP;
}
like image 149
T. Wallis Avatar answered Oct 10 '22 00:10

T. Wallis


I tried to do the same thing. I just started working with gstreamer. After understanding what T. Wallis meant, I wanted to test it with a simple pipeline. Unfortunately, the final linking of the new rtspsrc element with the pipeline is not working. However, I assume the error is somewhere else. I will go through the code again and read the dynamic pipeline manipulation procedure of gstreamer. But I am not sure, if I will be able to find the error so soon. Here is my code (it is my first time posting on stack overflow, sorry for potential nogos):

#include <gst/gst.h>
#include <gst/gstpad.h>
#include <gst/rtsp/gstrtsp.h>
#include <unistd.h>
#include <time.h>
#include <stdbool.h>

typedef struct _CustomData {
  GstElement *streaming_pipe;
  GstElement *src;
  GstElement *depay;
  GstElement *decoder;
  GstElement *sink;
  GMainLoop *m_loop; 
  gboolean change_url;
  gboolean url1;
  clock_t startT;
} CustomData;

static void on_rtsp_pad_added(GstElement *element, GstPad *new_pad,  CustomData *data){

gchar *name;
GstCaps *caps;


caps = gst_caps_from_string("application/x-rtp");
name = gst_pad_get_name(new_pad);


if(!gst_element_link_pads_filtered(element, name, data->depay, "sink", caps)){
    g_print("\npad_added: failed to link elements"); //ERROR when linking the new rtspsrc after breaking up the pipeline
}
g_free(name);
data->startT = clock();
}

static GstPadProbeReturn event_probe(GstPad *pad, GstPadProbeInfo *info, CustomData *data){
    GstElement *rtspsrcOld, *rtspsrcNew, *depay;

    if(GST_EVENT_TYPE(GST_PAD_PROBE_INFO_DATA(info)) != GST_EVENT_EOS){
        g_print("\n Not an EOS event; pass probe return");
        return GST_PAD_PROBE_PASS;
    }

    gst_pad_remove_probe(pad, GST_PAD_PROBE_INFO_ID(info));

    rtspsrcOld = gst_bin_get_by_name(GST_BIN(data->streaming_pipe), "rtspsrc");
    if(rtspsrcOld){
        depay = gst_bin_get_by_name(GST_BIN(data->streaming_pipe), "depay");
        gst_element_unlink(rtspsrcOld, depay);
        gst_bin_remove(GST_BIN(data->streaming_pipe), rtspsrcOld); //remove old rtspsrc from pipeline, should unlink from depay automatically.
        rtspsrcNew = gst_element_factory_make("rtspsrc", "rtspsrc123");
        g_object_set(rtspsrcNew, "location", "rtsp://xxxx/axis-media/media.amp?videocodec=h264&resolution=480x270", "latency", 0, NULL);
        g_signal_connect(rtspsrcNew, "pad-added", G_CALLBACK(on_rtsp_pad_added), data);

        gst_bin_add(GST_BIN(data->streaming_pipe), rtspsrcNew);
        gst_element_set_state(GST_ELEMENT(data->streaming_pipe), GST_STATE_PLAYING);
        g_print("\n set playing\n");

        return GST_PAD_PROBE_DROP;
    }
    return GST_PAD_PROBE_DROP;
}


static GstPadProbeReturn cb_have_data (GstPad *pad, GstPadProbeInfo *info, CustomData *data) {

    g_print("\nPROBE CALLBACK!");
    g_print("Time: %f", ((double) (clock() - data->startT)) / CLOCKS_PER_SEC);
    if(((double) (clock() - data->startT)) / CLOCKS_PER_SEC > 0.04){
        data->change_url = true;
        data->startT = clock();
    }
    if(data->change_url){
      g_print("\nIF PROBE CALLBACK!");
    GstPad *srcPad, *sinkPad;

    srcPad = gst_element_get_static_pad(data->decoder, "src");
    gst_pad_add_probe(srcPad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, event_probe, data, NULL);
    
    //push EOS into the element, wait for the EOS to appear on the srcpad
    sinkPad = gst_element_get_static_pad(data->depay, "sink");
    gst_pad_send_event(sinkPad, gst_event_new_eos());    

      data->change_url = false;
    }

    return GST_PAD_PROBE_OK;

}


int main(int argc, char *argv[])
{
/* Initialize GStreamer */
gst_init (&argc, &argv);
CustomData data;
GstStateChangeReturn ret;
GstPad *pad;

data.m_loop = g_main_loop_new(NULL, FALSE);

//create pipeline elements
data.streaming_pipe = gst_pipeline_new("display_pipeline");
data.src = gst_element_factory_make("rtspsrc", "rtspsrc");
data.depay = gst_element_factory_make("rtph264depay", "depay");
data.decoder = gst_element_factory_make("avdec_h264", "decoder");
data.sink = gst_element_factory_make("autovideosink", NULL);
data.change_url = false;
data.url1 = false;

if(!(data.streaming_pipe || data.src || data.depay || data.decoder || data.sink)){
    g_print("could not create pipeline elements");
    exit(1);
}

g_object_set(G_OBJECT(data.src), "location", "rtsp://xxxx/axis-media/media.amp", "latency", 0, NULL);
g_signal_connect(data.src, "pad-added", G_CALLBACK(on_rtsp_pad_added), &data);
//add and link elements to create full pipeline
gst_bin_add_many(GST_BIN(data.streaming_pipe), data.src, data.depay, data.decoder,  data.sink, NULL);
if(!gst_element_link_many(data.depay, data.decoder,  data.sink, NULL)){
    g_print("cannot link elements"); 
    exit(1);
}

pad = gst_element_get_static_pad (data.depay, "src");
if(pad == NULL){
    g_print("COULD NOT GET STATIC PAD");
}
gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
    (GstPadProbeCallback) cb_have_data, &data, NULL);
gst_object_unref (pad);

ret = gst_element_set_state (data.streaming_pipe, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
  g_printerr ("Unable to set the pipeline to the playing state.\n");
  gst_object_unref (data.streaming_pipe);
  return -1;
}



g_main_loop_run (data.m_loop);
}
like image 45
Peter Regier Avatar answered Oct 10 '22 00:10

Peter Regier