Skip to content

Instantly share code, notes, and snippets.

@tylercubell
Created February 11, 2019 01:07
Show Gist options
  • Select an option

  • Save tylercubell/14cf51a40c517e12c102c8f77831ee80 to your computer and use it in GitHub Desktop.

Select an option

Save tylercubell/14cf51a40c517e12c102c8f77831ee80 to your computer and use it in GitHub Desktop.

Revisions

  1. tylercubell created this gist Feb 11, 2019.
    141 changes: 141 additions & 0 deletions automatically_restart_live_stream.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,141 @@
    import gi
    gi.require_version('Gst', '1.0')
    from gi.repository import GObject, Gst

    Gst.init(None)

    class Main:
    def __init__(self):
    self.pipeline = Gst.Pipeline.new("pipeline")
    self.bus = self.pipeline.get_bus()

    # Test HLS stream.
    self.uri = "http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8"

    self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc")
    self.souphttpsrc.set_property("is-live", True)
    self.souphttpsrc.set_property("location", self.uri)
    self.pipeline.add(self.souphttpsrc)

    self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
    self.pipeline.add(self.decodebin)
    self.decodebin.connect("pad-added", self.decodebin_src_pad_created)

    self.audioconvert = Gst.ElementFactory.make("audioconvert", "audioconvert")
    self.audioconvert_sink_pad = self.audioconvert.get_static_pad("sink")
    self.pipeline.add(self.audioconvert)

    self.audioresample = Gst.ElementFactory.make("audioresample", "audioresample")
    self.pipeline.add(self.audioresample)

    self.autovideosink = Gst.ElementFactory.make("autovideosink", "autovideosink")
    self.autovideosink_sink_pad = self.autovideosink.get_static_pad("sink")
    self.pipeline.add(self.autovideosink)

    self.autoaudiosink = Gst.ElementFactory.make("autoaudiosink", "autoaudiosink")
    self.pipeline.add(self.autoaudiosink)

    self.souphttpsrc.link(self.decodebin)
    self.audioconvert.link(self.audioresample)
    self.audioresample.link(self.autoaudiosink)

    def decodebin_src_pad_created(self, element, pad):
    pad_type = pad.get_current_caps().get_structure(0).get_name()

    if pad_type == "audio/x-raw" and not self.audioconvert_sink_pad.is_linked():
    pad.link(self.audioconvert_sink_pad)

    if pad_type == "video/x-raw" and not self.autovideosink_sink_pad.is_linked():
    pad.link(self.autovideosink_sink_pad)

    def custom_message(self, name):
    custom_structure = Gst.Structure.new_empty(name)
    custom_message = Gst.Message.new_application(None, custom_structure)
    self.bus.post(custom_message)

    def start_switch(self):
    if not self.start_switch_active:
    self.start_switch_active = True

    self.souphttpsrc.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, \
    self.souphttpsrc_block_probe_callback)

    if self.decodebin.get_static_pad("src_0"):
    self.decodebin_block_probe_id = \
    self.decodebin.get_static_pad("src_0").add_probe(Gst.PadProbeType.BLOCK, \
    self.decodebin_block_probe_callback)

    self.custom_message("do_switch")

    def souphttpsrc_block_probe_callback(self, pad, info):
    return Gst.PadProbeReturn.DROP

    def decodebin_block_probe_callback(self, pad, info):
    return Gst.PadProbeReturn.DROP

    def do_switch(self):
    # Get ready to add/remove elements.
    self.pipeline.set_state(Gst.State.READY)

    # Remove old souphttpsrc.
    self.souphttpsrc.set_state(Gst.State.NULL)
    self.souphttpsrc.unlink(self.decodebin)
    self.pipeline.remove(self.souphttpsrc)
    del self.souphttpsrc

    # Remove old decodebin.
    self.decodebin.set_state(Gst.State.NULL)
    self.decodebin.unlink(self.autovideosink)
    self.decodebin.unlink(self.audioconvert)
    self.pipeline.remove(self.decodebin)
    del self.decodebin

    # Reset pipeline time so stream can play from the beginning.
    self.pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, 0)

    # Add new decodebin.
    self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
    self.pipeline.add(self.decodebin)
    self.decodebin.connect("pad-added", self.decodebin_src_pad_created)

    # Add new souphttpsrc.
    self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc")
    self.souphttpsrc.set_property("is-live", True)
    self.souphttpsrc.set_property("location", self.uri)
    self.pipeline.add(self.souphttpsrc)
    self.souphttpsrc.link(self.decodebin)

    # Play stream.
    self.pipeline.set_state(Gst.State.PLAYING)

    self.start_switch_active = False

    def run(self):
    self.start_switch_active = False
    self.pipeline.set_state(Gst.State.PLAYING)

    while True:
    try:
    message = self.bus.timed_pop(Gst.SECOND)

    if message == None:
    pass
    elif message.type == Gst.MessageType.APPLICATION:
    if message.get_structure().get_name() == "start_switch":
    if not self.start_switch_active:
    self.start_switch()
    elif message.get_structure().get_name() == "do_switch":
    self.do_switch()
    elif message.type == Gst.MessageType.EOS:
    break
    elif message.type == Gst.MessageType.ERROR:
    # Assumes error comes from souphttpsrc/decodebin for this example.
    # Can add filter using message.src.get_name()/message.src.get_parent().
    self.custom_message("start_switch")
    except KeyboardInterrupt:
    break

    self.pipeline.set_state(Gst.State.NULL)

    start = Main()
    start.run()