import gi gi.require_version('Gst', '1.0') from gi.repository import GObject, Gst Gst.init(None) class Main: def __init__(self): self.pipeline = Gst.Pipeline.new("pipeline") self.bus = self.pipeline.get_bus() # Test HLS stream. self.uri = "http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8" self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc") self.souphttpsrc.set_property("is-live", True) self.souphttpsrc.set_property("location", self.uri) self.pipeline.add(self.souphttpsrc) self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin") self.pipeline.add(self.decodebin) self.decodebin.connect("pad-added", self.decodebin_src_pad_created) self.audioconvert = Gst.ElementFactory.make("audioconvert", "audioconvert") self.audioconvert_sink_pad = self.audioconvert.get_static_pad("sink") self.pipeline.add(self.audioconvert) self.audioresample = Gst.ElementFactory.make("audioresample", "audioresample") self.pipeline.add(self.audioresample) self.autovideosink = Gst.ElementFactory.make("autovideosink", "autovideosink") self.autovideosink_sink_pad = self.autovideosink.get_static_pad("sink") self.pipeline.add(self.autovideosink) self.autoaudiosink = Gst.ElementFactory.make("autoaudiosink", "autoaudiosink") self.pipeline.add(self.autoaudiosink) self.souphttpsrc.link(self.decodebin) self.audioconvert.link(self.audioresample) self.audioresample.link(self.autoaudiosink) def decodebin_src_pad_created(self, element, pad): pad_type = pad.get_current_caps().get_structure(0).get_name() if pad_type == "audio/x-raw" and not self.audioconvert_sink_pad.is_linked(): pad.link(self.audioconvert_sink_pad) if pad_type == "video/x-raw" and not self.autovideosink_sink_pad.is_linked(): pad.link(self.autovideosink_sink_pad) def custom_message(self, name): custom_structure = Gst.Structure.new_empty(name) custom_message = Gst.Message.new_application(None, custom_structure) self.bus.post(custom_message) def start_switch(self): if not self.start_switch_active: self.start_switch_active = True self.souphttpsrc.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, \ self.souphttpsrc_block_probe_callback) if self.decodebin.get_static_pad("src_0"): self.decodebin_block_probe_id = \ self.decodebin.get_static_pad("src_0").add_probe(Gst.PadProbeType.BLOCK, \ self.decodebin_block_probe_callback) self.custom_message("do_switch") def souphttpsrc_block_probe_callback(self, pad, info): return Gst.PadProbeReturn.DROP def decodebin_block_probe_callback(self, pad, info): return Gst.PadProbeReturn.DROP def do_switch(self): # Get ready to add/remove elements. self.pipeline.set_state(Gst.State.READY) # Remove old souphttpsrc. self.souphttpsrc.set_state(Gst.State.NULL) self.souphttpsrc.unlink(self.decodebin) self.pipeline.remove(self.souphttpsrc) del self.souphttpsrc # Remove old decodebin. self.decodebin.set_state(Gst.State.NULL) self.decodebin.unlink(self.autovideosink) self.decodebin.unlink(self.audioconvert) self.pipeline.remove(self.decodebin) del self.decodebin # Reset pipeline time so stream can play from the beginning. self.pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, 0) # Add new decodebin. self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin") self.pipeline.add(self.decodebin) self.decodebin.connect("pad-added", self.decodebin_src_pad_created) # Add new souphttpsrc. self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc") self.souphttpsrc.set_property("is-live", True) self.souphttpsrc.set_property("location", self.uri) self.pipeline.add(self.souphttpsrc) self.souphttpsrc.link(self.decodebin) # Play stream. self.pipeline.set_state(Gst.State.PLAYING) self.start_switch_active = False def run(self): self.start_switch_active = False self.pipeline.set_state(Gst.State.PLAYING) while True: try: message = self.bus.timed_pop(Gst.SECOND) if message == None: pass elif message.type == Gst.MessageType.APPLICATION: if message.get_structure().get_name() == "start_switch": if not self.start_switch_active: self.start_switch() elif message.get_structure().get_name() == "do_switch": self.do_switch() elif message.type == Gst.MessageType.EOS: break elif message.type == Gst.MessageType.ERROR: # Assumes error comes from souphttpsrc/decodebin for this example. # Can add filter using message.src.get_name()/message.src.get_parent(). self.custom_message("start_switch") except KeyboardInterrupt: break self.pipeline.set_state(Gst.State.NULL) start = Main() start.run()