# -*- coding: utf-8 -*- """Generate samples with Gstreamer 'appsrc' element in 'push' mode""" import pygst pygst.require("0.10") import gobject import gst import numpy import threading # Initialize gobject in a threading environment gobject.threads_init() # GStreamer main loop mainloop = gobject.MainLoop() # Threading is needed to "push" buffer outside the gstreamer mainloop mainloop_thread = threading.Thread() mainloop = gobject.MainLoop() mainloop_thread.mainloop = mainloop mainloop_thread.run = mainloop_thread.mainloop.run pipeline = gst.Pipeline("pipeline") buffer_size = 4096 num_buffer = 10 array = numpy.random.randn(num_buffer * buffer_size, 1).astype('float32') appsrc = gst.element_factory_make('appsrc', 'appsrc') capstr = """audio/x-raw-float, width=32, depth=32, endianness=1234, rate=16000, channels=1""" print gst.caps_from_string(capstr) appsrc.set_property("caps", gst.caps_from_string(capstr)) converter = gst.element_factory_make('audioconvert', 'converter') encoder = gst.element_factory_make('lame', 'encoder') filesink = gst.element_factory_make('filesink', 'sink') filesink.set_property('location', '/tmp/test.mp3') pipeline.add(appsrc, converter, encoder, filesink) gst.element_link_many(appsrc, converter, encoder, filesink) def on_eos_cb(bus, msg): """Calback on End-Of-Stream message""" mainloop.quit() pipeline.set_state(gst.STATE_NULL) def on_error_cb(bus, msg): """Calback on Error message""" err, debug_info = msg.parse_error() print ("Error received from element %s: %s" % (msg.src.get_name(), err)) print ("Debugging information: %s" % debug_info) mainloop.quit() pipeline.set_state(gst.STATE_PLAYING) bus = pipeline.get_bus() bus.add_signal_watch() bus.connect('message::eos', on_eos_cb) bus.connect("message::error", on_error_cb) mainloop_thread.start() for k in xrange(1, num_buffer+1): print 'push %d/%d' % (k, num_buffer) buf = gst.Buffer(array[k*buffer_size:(k+1)*buffer_size]) appsrc.emit("push-buffer", buf)