import pyo64 as pyo import threading # for the sound generation thread s = pyo.Server() s.boot() fifo = pyo.FIFOPlayer(maxsize=100, mul=.3) fifo.out() # play around with n to hear the same sound with different buffer sizes # Should not be larger than 256*100, because then a single float would be # in every buffer we put into the fifo. On my machine, I can increase n to # around 2560 which is insane since each numpy array only contains 10 floats. # If you can affort it, make the arrays you put into the fifo a multiple of # s.getBufferSize() n = 100 def proc(fifo, n, stopevent): import numpy as np freq = 500 inc = 10 while not stopevent.wait(0): # np.sin results in click sounds, so use np.cos (zero-crossings) a = np.cos( np.linspace( 0, 2*np.pi*freq, num=256*100 ) ).astype( np.float64 if pyo_use_double else np.float32) # We could just put the array `a` into fifo.put(a), which would be # faster but for testing and debugging, we want to cut `a` into # smaller arrays and pump them into the fifo. The sound should be the # same. for i in xrange(n): if stopevent.wait(0): break else: fifo.put( (a[i*(a.size//n):(i+1)*(a.size//n)]) ) freq += inc if freq > 700: inc = -10 elif freq < 500: inc = 10 print("qsize: {}".format(fifo._base_objs[0]._queue.qsize())) # Don't use Pattern, since it will be called by the server in the same # thread as the server and block if the queue gets over maxsize. #pat = pyo.Pattern(function=proc, time=0.2).play() # Instead we just generate the data as fast as we can in a second thread # and let it wait (i.e. block) when the queue reaches maxsize. The thread # will continue when the server has consumed an item from the queue, stopevent = threading.Event() producer = threading.Thread(name="Compute audio signal", target=proc, args=[fifo, n, stopevent]) producer.start() s.gui(locals()) # Remember, the producer may be blocked at fifo.put, because the queue is full, # so we set the stopevent and additionally, we have to remove one item from # the queue, so that the last fifo.put() call returns. stopevent.set() fifo._base_objs[0]._queue.get_nowait()