Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dynamically (un)link elements in a running (gstreamer) pipeline?

there are plenty of examples in the gstreamer documentation on constructing and running static pipelines. however, there isn't much about changing/relinking elements in a live pipeline - while the media is actually flowing. it's definitely possible, so the question is:

  1. what gstreamer concepts/mechanics should i understand before attempting this?
  2. are there any pitfalls to watch out for?
  3. what is the basic procedure, or a good example?

accepted answer will be spoon fed, comprehensive, and with source code

like image 740
Jeremiah Rose Avatar asked Jun 19 '10 03:06

Jeremiah Rose


2 Answers

I tend to use output-selector or input selector bins depending on the situation rather than pad blocking complexity (I have answered the pad blocking in another post http://gstreamer-devel.966125.n4.nabble.com/Dynamically-adding-and-removing-branches-of-a-tee-td973635.html#a4656812). And connect the selector to fakesrc or fakesink bins when not in use. In the example below if one is using GTK then one may replace the line g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel); with gtk_toggle_button and put all the code presently in the switch_cb function into toggle button callback function. In this code one may switch between the two imagesinks. I would substitute one image sink with fakesink to keep pipeline running, in case I want to add a tee in the future with a filesink where I want to record video yet provide player an option to turn on (selector on imagesink)/off (selector on fakesink) the display. This allows one to add/remove bins at runtime using selector.

#include <gst/gst.h>

#define SWITCH_TIMEOUT 1000
#define NUM_VIDEO_BUFFERS 500

static GMainLoop *loop;

/* Output selector src pads */
static GstPad *osel_src1 = NULL;
static GstPad *osel_src2 = NULL;

static gboolean
my_bus_callback (GstBus * bus, GstMessage * message, gpointer data)
{
  g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR:{
      GError *err;
      gchar *debug;

      gst_message_parse_error (message, &err, &debug);
      g_print ("Error: %s\n", err->message);
      g_error_free (err);
      g_free (debug);

      g_main_loop_quit (loop);
      break;
    }
    case GST_MESSAGE_EOS:
      /* end-of-stream */
      g_main_loop_quit (loop);
      break;
    default:
      /* unhandled message */
      break;
  }
  /* we want to be notified again the next time there is a message
   * on the bus, so returning TRUE (FALSE means we want to stop watching
   * for messages on the bus and our callback should not be called again)
   */
  return TRUE;
}

static gboolean
switch_cb (gpointer user_data)
{
  GstElement *sel = GST_ELEMENT (user_data);
  GstPad *old_pad, *new_pad = NULL;

  g_object_get (G_OBJECT (sel), "active-pad", &old_pad, NULL);

  if (old_pad == osel_src1)
    new_pad = osel_src2;
  else
    new_pad = osel_src1;

  g_object_set (G_OBJECT (sel), "active-pad", new_pad, NULL);

  g_print ("switched from %s:%s to %s:%s\n", GST_DEBUG_PAD_NAME (old_pad),
      GST_DEBUG_PAD_NAME (new_pad));

  gst_object_unref (old_pad);

  return TRUE;

}

gint
main (gint argc, gchar * argv[])
{
  GstElement *pipeline, *src, *toverlay, *osel, *sink1, *sink2, *convert;
  GstPad *sinkpad1;
  GstPad *sinkpad2;
  GstBus *bus;

  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  /* create elements */
  pipeline = gst_element_factory_make ("pipeline", "pipeline");
  src = gst_element_factory_make ("videotestsrc", "src");
  toverlay = gst_element_factory_make ("timeoverlay", "timeoverlay");
  osel = gst_element_factory_make ("output-selector", "osel");
  convert = gst_element_factory_make ("ffmpegcolorspace", "convert");
  sink1 = gst_element_factory_make ("xvimagesink", "sink1");
  sink2 = gst_element_factory_make ("ximagesink", "sink2");

  if (!pipeline || !src || !toverlay || !osel || !convert || !sink1 || !sink2) {
    g_print ("missing element\n");
    return -1;
  }

  /* add them to bin */
  gst_bin_add_many (GST_BIN (pipeline), src, toverlay, osel, convert, sink1,
      sink2, NULL);

  /* set properties */
  g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
  g_object_set (G_OBJECT (src), "do-timestamp", TRUE, NULL);
  g_object_set (G_OBJECT (src), "num-buffers", NUM_VIDEO_BUFFERS, NULL);
  g_object_set (G_OBJECT (sink1), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (sink2), "sync", FALSE, "async", FALSE, NULL);
  g_object_set (G_OBJECT (osel), "resend-latest", TRUE, NULL);

  /* link src ! timeoverlay ! osel */
  if (!gst_element_link_many (src, toverlay, osel, NULL)) {
    g_print ("linking failed\n");
    return -1;
  }

  /* link output 1 */
  sinkpad1 = gst_element_get_static_pad (sink1, "sink");
  osel_src1 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src1, sinkpad1) != GST_PAD_LINK_OK) {
    g_print ("linking output 1 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad1);

  /* link output 2 */
  sinkpad2 = gst_element_get_static_pad (convert, "sink");
  osel_src2 = gst_element_get_request_pad (osel, "src%d");
  if (gst_pad_link (osel_src2, sinkpad2) != GST_PAD_LINK_OK) {
    g_print ("linking output 2 failed\n");
    return -1;
  }
  gst_object_unref (sinkpad2);

  if (!gst_element_link (convert, sink2)) {
    g_print ("linking output 2 failed\n");
    return -1;
  }

  /* add switch callback */
  g_timeout_add (SWITCH_TIMEOUT, switch_cb, osel);

  /* change to playing */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  gst_bus_add_watch (bus, my_bus_callback, loop);
  gst_object_unref (bus);

  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  /* now run */
  g_main_loop_run (loop);

  /* also clean up */
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_element_release_request_pad (osel, osel_src1);
  gst_element_release_request_pad (osel, osel_src2);
  gst_object_unref (GST_OBJECT (pipeline));

  return 0;
}
like image 60
enthusiasticgeek Avatar answered Sep 16 '22 12:09

enthusiasticgeek


  1. My favorite "concept" for understanding linking (and dynamic linking), is thinking about the pipeline as a real pipe with water streaming through it. Once you do this, some things will become very obvious. Like, "do you set the source to PLAYING before linking the element?", becomes "do you turn on the water before connecting the hose?", and it sort of answers itself. Even more so with dynamic linking, how would you ensure that no water "leaks" (that is bad, "leaks" in GStreamer is the equivalent of getting a GST_FLOW_NOT_LINKED, and will stop your source and the fun) or get clogged up (can cause dropping or congestion of packets).

  2. Yes. Many. With a little disclaimer that I still currently work with 0.10 and some of this might have been fixed with 1.0, there is unfortunately very, very hard to do dynamic linking and unlinking with GStreamer 0.10. Let me explain: Let's say you are using a Tee, and you want to unlink one branch. You would start by releasing the Tees srcpad (never mind unlinking it, that happens as part of the release of the pad), and now you should safely be able to tear down the elements downstream from that pad. (The water equivalent is that you close a valve after the tee, and should now be able to dismantle the pipes after the valve, you would not start dismantling the pipes without closing the valve first unless you wanted to get wet...) This will work most of the time, but there is a race here. Because after you have released the pad, there might still be a push or a pad-alloc on their way on that pad, and if you now in your code start tearing down the downstream elements, this might now crash because of the race that exists in some elements if they get a push or pad-alloc while tearing down, or you get a GST_FLOW_WRONG_STATE or GST_FLOW_NOT_LINKED and they will go back to the source stopping the stream for everyone...

  3. I did a lot of experiments with this, and found that if you need stability, and crashing/freezing occasionally is not an option you need an element that will serve as your dynamic safety-net. An element that will guarantee that absolutely no activity will happen on a pad after you release/unlink it. The only way to do this is to break another GStreamer paradigm of not pushing while holding a lock: you need to hold a lock while pushing / sending events / pad-allocing. I made such a thing a while back here. (test-case being the most important thing of course, as it allows you to test your own / other elements for their safeness) You could also imagine a lock-free element that would swallow all bad FlowReturns, and just paint a pretty picture for its upstream, but then you would need to be absolutely sure that all your downstream-elements would be "push or pad-alloc received while shutting down"-safe, since your element would not be able to guarantee that once "stop the flow" (release/unlink) has been executed, a little drop would not squeeze past.

Of course you have to put some of this in perspective. The window for these terrible race-conditions I am talking about is in fact very, very small, and might only happen every 1000th or 10.000th time you run your program. But for a professional application this is of course not acceptable. I did a talk where I covered some of this stuff here

like image 33
Havard Graff Avatar answered Sep 19 '22 12:09

Havard Graff