# License: BSD 3-clause # Authors: Kyle Kastner from __future__ import division import numpy as np """ # TODO: Is this fixed or hosed # Do numpy version check for <= 1.9 ! Something crazy going on in 1.10 if int(np.__version__.split(".")[1]) >= 10: print("Only numpy <= 1.9 currently supported! There is a " "numerical error in one of the numpy 1.10 routines. " "Hopefully this will be debugged an corrected soon. " "For the intrepid, the error can be seen by running" "run_phase_reconstruction()") """ from numpy.lib.stride_tricks import as_strided import scipy.signal as sg from scipy.interpolate import interp1d import wave from scipy.cluster.vq import vq from scipy import linalg, fftpack from numpy.testing import assert_almost_equal from scipy.linalg import svd from scipy.io import wavfile from scipy.signal import firwin import zipfile import tarfile import os import copy try: import urllib.request as urllib # for backwards compatibility except ImportError: import urllib2 as urllib def download(url, server_fname, local_fname=None, progress_update_percentage=5, bypass_certificate_check=False): """ An internet download utility modified from http://stackoverflow.com/questions/22676/ how-do-i-download-a-file-over-http-using-python/22776#22776 """ if bypass_certificate_check: import ssl ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE u = urllib.urlopen(url, context=ctx) else: u = urllib.urlopen(url) if local_fname is None: local_fname = server_fname full_path = local_fname meta = u.info() with open(full_path, 'wb') as f: try: file_size = int(meta.get("Content-Length")) except TypeError: print("WARNING: Cannot get file size, displaying bytes instead!") file_size = 100 print("Downloading: %s Bytes: %s" % (server_fname, file_size)) file_size_dl = 0 block_sz = int(1E7) p = 0 while True: buffer = u.read(block_sz) if not buffer: break file_size_dl += len(buffer) f.write(buffer) if (file_size_dl * 100. / file_size) > p: status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size) print(status) p += progress_update_percentage def fetch_sample_speech_tapestry(): url = "https://www.dropbox.com/s/qte66a7haqspq2g/tapestry.wav?dl=1" wav_path = "tapestry.wav" if not os.path.exists(wav_path): download(url, wav_path) fs, d = wavfile.read(wav_path) d = d.astype('float32') / (2 ** 15) # file is stereo? - just choose one channel return fs, d def fetch_sample_file(wav_path): if not os.path.exists(wav_path): raise ValueError("Unable to find file at path %s" % wav_path) fs, d = wavfile.read(wav_path) d = d.astype('float32') / (2 ** 15) # file is stereo - just choose one channel if len(d.shape) > 1: d = d[:, 0] return fs, d def fetch_sample_music(): url = "http://www.music.helsinki.fi/tmt/opetus/uusmedia/esim/" url += "a2002011001-e02-16kHz.wav" wav_path = "test.wav" if not os.path.exists(wav_path): download(url, wav_path) fs, d = wavfile.read(wav_path) d = d.astype('float32') / (2 ** 15) # file is stereo - just choose one channel d = d[:, 0] return fs, d def fetch_sample_speech_fruit(n_samples=None): url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz' wav_path = "audio.tar.gz" if not os.path.exists(wav_path): download(url, wav_path) tf = tarfile.open(wav_path) wav_names = [fname for fname in tf.getnames() if ".wav" in fname.split(os.sep)[-1]] speech = [] print("Loading speech files...") for wav_name in wav_names[:n_samples]: f = tf.extractfile(wav_name) fs, d = wavfile.read(f) d = d.astype('float32') / (2 ** 15) speech.append(d) return fs, speech def fetch_sample_speech_eustace(n_samples=None): """ http://www.cstr.ed.ac.uk/projects/eustace/download.html """ # data url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_wav.zip" wav_path = "eustace_wav.zip" if not os.path.exists(wav_path): download(url, wav_path) # labels url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_labels.zip" labels_path = "eustace_labels.zip" if not os.path.exists(labels_path): download(url, labels_path) # Read wavfiles # 16 kHz wav zf = zipfile.ZipFile(wav_path, 'r') wav_names = [fname for fname in zf.namelist() if ".wav" in fname.split(os.sep)[-1]] fs = 16000 speech = [] print("Loading speech files...") for wav_name in wav_names[:n_samples]: wav_str = zf.read(wav_name) d = np.frombuffer(wav_str, dtype=np.int16) d = d.astype('float32') / (2 ** 15) speech.append(d) zf = zipfile.ZipFile(labels_path, 'r') label_names = [fname for fname in zf.namelist() if ".lab" in fname.split(os.sep)[-1]] labels = [] print("Loading label files...") for label_name in label_names[:n_samples]: label_file_str = zf.read(label_name) labels.append(label_file_str) return fs, speech def stft(X, fftsize=128, step="half", mean_normalize=True, real=False, compute_onesided=True): """ Compute STFT for 1D real valued input X """ if real: local_fft = fftpack.rfft cut = -1 else: local_fft = fftpack.fft cut = None if compute_onesided: cut = fftsize // 2 + 1 if mean_normalize: X -= X.mean() if step == "half": X = halfoverlap(X, fftsize) else: X = overlap(X, fftsize, step) size = fftsize win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) X = X * win[None] X = local_fft(X)[:, :cut] return X def istft(X, fftsize=128, step="half", wsola=False, mean_normalize=True, real=False, compute_onesided=True): """ Compute ISTFT for STFT transformed X """ if real: local_ifft = fftpack.irfft X_pad = np.zeros((X.shape[0], X.shape[1] + 1)) + 0j X_pad[:, :-1] = X X = X_pad else: local_ifft = fftpack.ifft if compute_onesided: X_pad = np.zeros((X.shape[0], 2 * X.shape[1])) + 0j X_pad[:, :fftsize // 2 + 1] = X X_pad[:, fftsize // 2 + 1:] = 0 X = X_pad X = local_ifft(X).astype("float64") if step == "half": X = invert_halfoverlap(X) else: X = overlap_add(X, step, wsola=wsola) if mean_normalize: X -= np.mean(X) return X def mdct_slow(X, dctsize=128): M = dctsize N = 2 * dctsize N_0 = (M + 1) / 2 X = halfoverlap(X, N) X = sine_window(X) n, k = np.meshgrid(np.arange(N), np.arange(M)) # Use transpose due to "samples as rows" convention tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M).T return np.dot(X, tf) def imdct_slow(X, dctsize=128): M = dctsize N = 2 * dctsize N_0 = (M + 1) / 2 N_4 = N / 4 n, k = np.meshgrid(np.arange(N), np.arange(M)) # inverse *is not* transposed tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M) X_r = np.dot(X, tf) / N_4 X_r = sine_window(X_r) X = invert_halfoverlap(X_r) return X def nsgcwin(fmin, fmax, n_bins, fs, signal_len, gamma): """ Nonstationary Gabor window calculation References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ # use a hanning window # no fractional shifts fftres = fs / signal_len fmin = float(fmin) fmax = float(fmax) gamma = float(gamma) nyq = fs / 2. b = np.floor(n_bins * np.log2(fmax / fmin)) fbas = fmin * 2 ** (np.arange(b + 1) / float(n_bins)) Q = 2 ** (1. / n_bins) - 2 ** (-1. / n_bins) cqtbw = Q * fbas + gamma cqtbw = cqtbw.ravel() maxidx = np.where(fbas + cqtbw / 2. > nyq)[0] if len(maxidx) > 0: # replicate bug in MATLAB version... # or is it a feature if sum(maxidx) == 0: first = len(cqtbw) - 1 else: first = maxidx[0] fbas = fbas[:first] cqtbw = cqtbw[:first] minidx = np.where(fbas - cqtbw / 2. < 0)[0] if len(minidx) > 0: fbas = fbas[minidx[-1]+1:] cqtbw = cqtbw[minidx[-1]+1:] fbas_len = len(fbas) fbas_new = np.zeros((2 * (len(fbas) + 1))) fbas_new[1:len(fbas) + 1] = fbas fbas = fbas_new fbas[fbas_len + 1] = nyq fbas[fbas_len + 2:] = fs - fbas[1:fbas_len + 1][::-1] bw = np.zeros_like(fbas) bw[0] = 2 * fmin bw[1:len(cqtbw) + 1] = cqtbw bw[len(cqtbw) + 1] = fbas[fbas_len + 2] - fbas[fbas_len] bw[-len(cqtbw):] = cqtbw[::-1] bw = bw / fftres fbas = fbas / fftres posit = np.zeros_like(fbas) posit[:fbas_len + 2] = np.floor(fbas[:fbas_len + 2]) posit[fbas_len + 2:] = np.ceil(fbas[fbas_len + 2:]) base_shift = -posit[-1] % signal_len shift = np.zeros_like(posit).astype("int32") shift[1:] = (posit[1:] - posit[:-1]).astype("int32") shift[0] = base_shift bw = np.round(bw) bwfac = 1 M = bw min_win = 4 for ii in range(len(bw)): if bw[ii] < min_win: bw[ii] = min_win M[ii] = bw[ii] def _win(numel): if numel % 2 == 0: s1 = np.arange(0, .5, 1. / numel) if len(s1) != numel // 2: # edge case with small floating point numbers... s1 = s1[:-1] s2 = np.arange(-.5, 0, 1. / numel) if len(s2) != numel // 2: # edge case with small floating point numbers... s2 = s2[:-1] x = np.concatenate((s1, s2)) else: s1 = np.arange(0, .5, 1. / numel) s2 = np.arange(-.5 + .5 / numel, 0, 1. / numel) if len(s2) != numel // 2: # assume integer truncate 27 // 2 = 13 s2 = s2[:-1] x = np.concatenate((s1, s2)) assert len(x) == numel g = .5 + .5 * np.cos(2 * np.pi * x) return g multiscale = [_win(bi) for bi in bw] bw = bwfac * np.ceil(M / bwfac) for kk in [0, fbas_len + 1]: if M[kk] > M[kk + 1]: multiscale[kk] = np.ones(M[kk]).astype(multiscale[0].dtype) i1 = np.floor(M[kk] / 2) - np.floor(M[kk + 1] / 2) i2 = np.floor(M[kk] / 2) + np.ceil(M[kk + 1] / 2) # Very rarely, gets an off by 1 error? Seems to be at the end... # for now, slice multiscale[kk][i1:i2] = _win(M[kk + 1]) multiscale[kk] = multiscale[kk] / np.sqrt(M[kk]) return multiscale, shift, M def nsgtf_real(X, multiscale, shift, window_lens): """ Nonstationary Gabor Transform for real values References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ # This will break with multchannel input signal_len = len(X) N = len(shift) X_fft = np.fft.fft(X) fill = np.sum(shift) - signal_len if fill > 0: X_fft_tmp = np.zeros((signal_len + shift)) X_fft_tmp[:len(X_fft)] = X_fft X_fft = X_fft_tmp posit = np.cumsum(shift) - shift[0] scale_lens = np.array([len(m) for m in multiscale]) N = np.where(posit - np.floor(scale_lens) <= (signal_len + fill) / 2)[0][-1] c = [] # c[0] is almost exact for ii in range(N): idx_l = np.arange(np.ceil(scale_lens[ii] / 2), scale_lens[ii]) idx_r = np.arange(np.ceil(scale_lens[ii] / 2)) idx = np.concatenate((idx_l, idx_r)) idx = idx.astype("int32") subwin_range = posit[ii] + np.arange(-np.floor(scale_lens[ii] / 2), np.ceil(scale_lens[ii] / 2)) win_range = subwin_range % (signal_len + fill) win_range = win_range.astype("int32") if window_lens[ii] < scale_lens[ii]: raise ValueError("Not handling 'not enough channels' case") else: temp = np.zeros((window_lens[ii],)).astype(X_fft.dtype) temp_idx_l = np.arange(len(temp) - np.floor(scale_lens[ii] / 2), len(temp)) temp_idx_r = np.arange(np.ceil(scale_lens[ii] / 2)) temp_idx = np.concatenate((temp_idx_l, temp_idx_r)) temp_idx = temp_idx.astype("int32") temp[temp_idx] = X_fft[win_range] * multiscale[ii][idx] fs_new_bins = window_lens[ii] fk_bins = posit[ii] displace = fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins displace = displace.astype("int32") temp = np.roll(temp, displace) c.append(np.fft.ifft(temp)) if 0: # cell2mat concatenation c = np.concatenate(c) return c def nsdual(multiscale, shift, window_lens): """ Calculation of nonstationary inverse gabor filters References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ N = len(shift) posit = np.cumsum(shift) seq_len = posit[-1] posit = posit - shift[0] diagonal = np.zeros((seq_len,)) win_range = [] for ii in range(N): filt_len = len(multiscale[ii]) idx = np.arange(-np.floor(filt_len / 2), np.ceil(filt_len / 2)) win_range.append((posit[ii] + idx) % seq_len) subdiag = window_lens[ii] * np.fft.fftshift(multiscale[ii]) ** 2 ind = win_range[ii].astype(np.int) diagonal[ind] = diagonal[ind] + subdiag dual_multiscale = multiscale for ii in range(N): ind = win_range[ii].astype(np.int) dual_multiscale[ii] = np.fft.ifftshift( np.fft.fftshift(dual_multiscale[ii]) / diagonal[ind]) return dual_multiscale def nsgitf_real(c, c_dc, c_nyq, multiscale, shift): """ Nonstationary Inverse Gabor Transform on real valued signal References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ c_l = [] c_l.append(c_dc) c_l.extend([ci for ci in c]) c_l.append(c_nyq) posit = np.cumsum(shift) seq_len = posit[-1] posit -= shift[0] out = np.zeros((seq_len,)).astype(c_l[1].dtype) for ii in range(len(c_l)): filt_len = len(multiscale[ii]) win_range = posit[ii] + np.arange(-np.floor(filt_len / 2), np.ceil(filt_len / 2)) win_range = (win_range % seq_len).astype(np.int) temp = np.fft.fft(c_l[ii]) * len(c_l[ii]) fs_new_bins = len(c_l[ii]) fk_bins = posit[ii] displace = int(fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins) temp = np.roll(temp, -displace) l = np.arange(len(temp) - np.floor(filt_len / 2), len(temp)) r = np.arange(np.ceil(filt_len / 2)) temp_idx = (np.concatenate((l, r)) % len(temp)).astype(np.int) temp = temp[temp_idx] lf = np.arange(filt_len - np.floor(filt_len / 2), filt_len) rf = np.arange(np.ceil(filt_len / 2)) filt_idx = np.concatenate((lf, rf)).astype(np.int) m = multiscale[ii][filt_idx] out[win_range] = out[win_range] + m * temp nyq_bin = np.floor(seq_len / 2) + 1 out_idx = np.arange( nyq_bin - np.abs(1 - seq_len % 2) - 1, 0, -1).astype(np.int) out[nyq_bin:] = np.conj(out[out_idx]) t_out = np.real(np.fft.ifft(out)).astype(np.float64) return t_out def cqt(X, fs, n_bins=48, fmin=27.5, fmax="nyq", gamma=20): """ Constant Q Transform References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ if fmax == "nyq": fmax = fs / 2. multiscale, shift, window_lens = nsgcwin(fmin, fmax, n_bins, fs, len(X), gamma) fbas = fs * np.cumsum(shift[1:]) / len(X) fbas = fbas[:len(window_lens) // 2 - 1] bins = window_lens.shape[0] // 2 - 1 window_lens[1:bins + 1] = window_lens[bins + 2] window_lens[bins + 2:] = window_lens[1:bins + 1][::-1] norm = 2. * window_lens[:bins + 2] / float(len(X)) norm = np.concatenate((norm, norm[1:-1][::-1])) multiscale = [norm[ii] * multiscale[ii] for ii in range(2 * (bins + 1))] c = nsgtf_real(X, multiscale, shift, window_lens) c_dc = c[0] c_nyq = c[-1] c_sub = c[1:-1] c = np.vstack(c_sub) return c, c_dc, c_nyq, multiscale, shift, window_lens def icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens): """ Inverse constant Q Transform References ---------- Velasco G. A., Holighaus N., Dorfler M., Grill T. Constructing an invertible constant-Q transform with nonstationary Gabor frames, Proceedings of the 14th International Conference on Digital Audio Effects (DAFx 11), Paris, France, 2011 Holighaus N., Dorfler M., Velasco G. A. and Grill T. A framework for invertible, real-time constant-Q transforms, submitted. Original matlab code copyright follows: AUTHOR(s) : Monika Dorfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA http://nuhag.eu/ Permission is granted to modify and re-distribute this code in any manner as long as this notice is preserved. All standard disclaimers apply. """ new_multiscale = nsdual(multiscale, shift, window_lens) X = nsgitf_real(X_cq, c_dc, c_nyq, new_multiscale, shift) return X def rolling_mean(X, window_size): w = 1.0 / window_size * np.ones((window_size)) return np.correlate(X, w, 'valid') def rolling_window(X, window_size): # for 1d data shape = X.shape[:-1] + (X.shape[-1] - window_size + 1, window_size) strides = X.strides + (X.strides[-1],) return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides) def voiced_unvoiced(X, window_size=256, window_step=128, copy=True): """ Voiced unvoiced detection from a raw signal Based on code from: https://www.clear.rice.edu/elec532/PROJECTS96/lpc/code.html Other references: http://www.seas.ucla.edu/spapl/code/harmfreq_MOLRT_VAD.m Parameters ---------- X : ndarray Raw input signal window_size : int, optional (default=256) The window size to use, in samples. window_step : int, optional (default=128) How far the window steps after each calculation, in samples. copy : bool, optional (default=True) Whether to make a copy of the input array or allow in place changes. """ X = np.array(X, copy=copy) if len(X.shape) < 2: X = X[None] n_points = X.shape[1] n_windows = n_points // window_step # Padding pad_sizes = [(window_size - window_step) // 2, window_size - window_step // 2] # TODO: Handling for odd window sizes / steps X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X, np.zeros((X.shape[0], pad_sizes[1])))) clipping_factor = 0.6 b, a = sg.butter(10, np.pi * 9 / 40) voiced_unvoiced = np.zeros((n_windows, 1)) period = np.zeros((n_windows, 1)) for window in range(max(n_windows - 1, 1)): XX = X.ravel()[window * window_step + np.arange(window_size)] XX *= sg.hamming(len(XX)) XX = sg.lfilter(b, a, XX) left_max = np.max(np.abs(XX[:len(XX) // 3])) right_max = np.max(np.abs(XX[-len(XX) // 3:])) clip_value = clipping_factor * np.min([left_max, right_max]) XX_clip = np.clip(XX, clip_value, -clip_value) XX_corr = np.correlate(XX_clip, XX_clip, mode='full') center = np.argmax(XX_corr) right_XX_corr = XX_corr[center:] prev_window = max([window - 1, 0]) if voiced_unvoiced[prev_window] > 0: # Want it to be harder to turn off than turn on strength_factor = .29 else: strength_factor = .3 start = np.where(right_XX_corr < .3 * XX_corr[center])[0] # 20 is hardcoded but should depend on samplerate? try: start = np.max([20, start[0]]) except IndexError: start = 20 search_corr = right_XX_corr[start:] index = np.argmax(search_corr) second_max = search_corr[index] if (second_max > strength_factor * XX_corr[center]): voiced_unvoiced[window] = 1 period[window] = start + index - 1 else: voiced_unvoiced[window] = 0 period[window] = 0 return np.array(voiced_unvoiced), np.array(period) def lpc_analysis(X, order=8, window_step=128, window_size=2 * 128, emphasis=0.9, voiced_start_threshold=.9, voiced_stop_threshold=.6, truncate=False, copy=True): """ Extract LPC coefficients from a signal Based on code from: http://labrosa.ee.columbia.edu/matlab/sws/ _rParameters ---------- X : ndarray Signals to extract LPC coefficients from order : int, optional (default=8) Order of the LPC coefficients. For speech, use the general rule that the order is two times the expected number of formants plus 2. This can be formulated as 2 + 2 * (fs // 2000). For approx. signals with fs = 7000, this is 8 coefficients - 2 + 2 * (7000 // 2000). window_step : int, optional (default=128) The size (in samples) of the space between each window window_size : int, optional (default=2 * 128) The size of each window (in samples) to extract coefficients over emphasis : float, optional (default=0.9) The emphasis coefficient to use for filtering voiced_start_threshold : float, optional (default=0.9) Upper power threshold for estimating when speech has started voiced_stop_threshold : float, optional (default=0.6) Lower power threshold for estimating when speech has stopped truncate : bool, optional (default=False) Whether to cut the data at the last window or do zero padding. copy : bool, optional (default=True) Whether to copy the input X or modify in place Returns ------- lp_coefficients : ndarray lp coefficients to describe the frame per_frame_gain : ndarray calculated gain for each frame residual_excitation : ndarray leftover energy which is not described by lp coefficents and gain voiced_frames : ndarray array of [0, 1] values which holds voiced/unvoiced decision for each frame. References ---------- D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ """ X = np.array(X, copy=copy) if len(X.shape) < 2: X = X[None] n_points = X.shape[1] n_windows = int(n_points // window_step) if not truncate: pad_sizes = [(window_size - window_step) // 2, window_size - window_step // 2] # TODO: Handling for odd window sizes / steps X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X, np.zeros((X.shape[0], pad_sizes[1])))) else: pad_sizes = [0, 0] X = X[0, :n_windows * window_step] lp_coefficients = np.zeros((n_windows, order + 1)) per_frame_gain = np.zeros((n_windows, 1)) residual_excitation = np.zeros( ((n_windows - 1) * window_step + window_size)) # Pre-emphasis high-pass filter X = sg.lfilter([1, -emphasis], 1, X) # stride_tricks.as_strided? autocorr_X = np.zeros((n_windows, 2 * window_size - 1)) for window in range(max(n_windows - 1, 1)): wtws = int(window * window_step) XX = X.ravel()[wtws + np.arange(window_size, dtype="int32")] WXX = XX * sg.hanning(window_size) autocorr_X[window] = np.correlate(WXX, WXX, mode='full') center = np.argmax(autocorr_X[window]) RXX = autocorr_X[window, np.arange(center, window_size + order, dtype="int32")] R = linalg.toeplitz(RXX[:-1]) solved_R = linalg.pinv(R).dot(RXX[1:]) filter_coefs = np.hstack((1, -solved_R)) residual_signal = sg.lfilter(filter_coefs, 1, WXX) gain = np.sqrt(np.mean(residual_signal ** 2)) lp_coefficients[window] = filter_coefs per_frame_gain[window] = gain assign_range = wtws + np.arange(window_size, dtype="int32") residual_excitation[assign_range] += residual_signal / gain # Throw away first part in overlap mode for proper synthesis residual_excitation = residual_excitation[pad_sizes[0]:] return lp_coefficients, per_frame_gain, residual_excitation def lpc_to_frequency(lp_coefficients, per_frame_gain): """ Extract resonant frequencies and magnitudes from LPC coefficients and gains. Parameters ---------- lp_coefficients : ndarray LPC coefficients, such as those calculated by ``lpc_analysis`` per_frame_gain : ndarray Gain calculated for each frame, such as those calculated by ``lpc_analysis`` Returns ------- frequencies : ndarray Resonant frequencies calculated from LPC coefficients and gain. Returned frequencies are from 0 to 2 * pi magnitudes : ndarray Magnitudes of resonant frequencies References ---------- D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ """ n_windows, order = lp_coefficients.shape frame_frequencies = np.zeros((n_windows, (order - 1) // 2)) frame_magnitudes = np.zeros_like(frame_frequencies) for window in range(n_windows): w_coefs = lp_coefficients[window] g_coefs = per_frame_gain[window] roots = np.roots(np.hstack(([1], w_coefs[1:]))) # Roots doesn't return the same thing as MATLAB... agh frequencies, index = np.unique( np.abs(np.angle(roots)), return_index=True) # Make sure 0 doesn't show up... gtz = np.where(frequencies > 0)[0] frequencies = frequencies[gtz] index = index[gtz] magnitudes = g_coefs / (1. - np.abs(roots)) sort_index = np.argsort(frequencies) frame_frequencies[window, :len(sort_index)] = frequencies[sort_index] frame_magnitudes[window, :len(sort_index)] = magnitudes[sort_index] return frame_frequencies, frame_magnitudes def lpc_to_lsf(all_lpc): if len(all_lpc.shape) < 2: all_lpc = all_lpc[None] order = all_lpc.shape[1] - 1 all_lsf = np.zeros((len(all_lpc), order)) for i in range(len(all_lpc)): lpc = all_lpc[i] lpc1 = np.append(lpc, 0) lpc2 = lpc1[::-1] sum_filt = lpc1 + lpc2 diff_filt = lpc1 - lpc2 if order % 2 != 0: deconv_diff, _ = sg.deconvolve(diff_filt, [1, 0, -1]) deconv_sum = sum_filt else: deconv_diff, _ = sg.deconvolve(diff_filt, [1, -1]) deconv_sum, _ = sg.deconvolve(sum_filt, [1, 1]) roots_diff = np.roots(deconv_diff) roots_sum = np.roots(deconv_sum) angle_diff = np.angle(roots_diff[::2]) angle_sum = np.angle(roots_sum[::2]) lsf = np.sort(np.hstack((angle_diff, angle_sum))) if len(lsf) != 0: all_lsf[i] = lsf return np.squeeze(all_lsf) def lsf_to_lpc(all_lsf): if len(all_lsf.shape) < 2: all_lsf = all_lsf[None] order = all_lsf.shape[1] all_lpc = np.zeros((len(all_lsf), order + 1)) for i in range(len(all_lsf)): lsf = all_lsf[i] zeros = np.exp(1j * lsf) sum_zeros = zeros[::2] diff_zeros = zeros[1::2] sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros))) diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros))) sum_filt = np.poly(sum_zeros) diff_filt = np.poly(diff_zeros) if order % 2 != 0: deconv_diff = sg.convolve(diff_filt, [1, 0, -1]) deconv_sum = sum_filt else: deconv_diff = sg.convolve(diff_filt, [1, -1]) deconv_sum = sg.convolve(sum_filt, [1, 1]) lpc = .5 * (deconv_sum + deconv_diff) # Last coefficient is 0 and not returned all_lpc[i] = lpc[:-1] return np.squeeze(all_lpc) def lpc_synthesis(lp_coefficients, per_frame_gain, residual_excitation=None, voiced_frames=None, window_step=128, emphasis=0.9): """ Synthesize a signal from LPC coefficients Based on code from: http://labrosa.ee.columbia.edu/matlab/sws/ http://web.uvic.ca/~tyoon/resource/auditorytoolbox/auditorytoolbox/synlpc.html Parameters ---------- lp_coefficients : ndarray Linear prediction coefficients per_frame_gain : ndarray Gain coefficients residual_excitation : ndarray or None, optional (default=None) Residual excitations. If None, this will be synthesized with white noise voiced_frames : ndarray or None, optional (default=None) Voiced frames. If None, all frames assumed to be voiced. window_step : int, optional (default=128) The size (in samples) of the space between each window emphasis : float, optional (default=0.9) The emphasis coefficient to use for filtering overlap_add : bool, optional (default=True) What type of processing to use when joining windows copy : bool, optional (default=True) Whether to copy the input X or modify in place Returns ------- synthesized : ndarray Sound vector synthesized from input arguments References ---------- D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ """ # TODO: Incorporate better synthesis from # http://eecs.oregonstate.edu/education/docs/ece352/CompleteManual.pdf window_size = 2 * window_step [n_windows, order] = lp_coefficients.shape n_points = (n_windows + 1) * window_step n_excitation_points = n_points + window_step + window_step // 2 random_state = np.random.RandomState(1999) if residual_excitation is None: # Need to generate excitation if voiced_frames is None: # No voiced/unvoiced info voiced_frames = np.ones((lp_coefficients.shape[0], 1)) residual_excitation = np.zeros((n_excitation_points)) f, m = lpc_to_frequency(lp_coefficients, per_frame_gain) t = np.linspace(0, 1, window_size, endpoint=False) hanning = sg.hanning(window_size) for window in range(n_windows): window_base = window * window_step index = window_base + np.arange(window_size) if voiced_frames[window]: sig = np.zeros_like(t) cycles = np.cumsum(f[window][0] * t) sig += sg.sawtooth(cycles, 0.001) residual_excitation[index] += hanning * sig residual_excitation[index] += hanning * 0.01 * random_state.randn( window_size) else: n_excitation_points = residual_excitation.shape[0] n_points = n_excitation_points + window_step + window_step // 2 residual_excitation = np.hstack((residual_excitation, np.zeros(window_size))) if voiced_frames is None: voiced_frames = np.ones_like(per_frame_gain) synthesized = np.zeros((n_points)) for window in range(n_windows): window_base = window * window_step oldbit = synthesized[window_base + np.arange(window_step)] w_coefs = lp_coefficients[window] if not np.all(w_coefs): # Hack to make lfilter avoid # ValueError: BUG: filter coefficient a[0] == 0 not supported yet # when all coeffs are 0 w_coefs = [1] g_coefs = voiced_frames[window] * per_frame_gain[window] index = window_base + np.arange(window_size) newbit = g_coefs * sg.lfilter([1], w_coefs, residual_excitation[index]) synthesized[index] = np.hstack((oldbit, np.zeros( (window_size - window_step)))) synthesized[index] += sg.hanning(window_size) * newbit synthesized = sg.lfilter([1], [1, -emphasis], synthesized) return synthesized def soundsc(X, gain_scale=.9, copy=True): """ Approximate implementation of soundsc from MATLAB without the audio playing. Parameters ---------- X : ndarray Signal to be rescaled gain_scale : float Gain multipler, default .9 (90% of maximum representation) copy : bool, optional (default=True) Whether to make a copy of input signal or operate in place. Returns ------- X_sc : ndarray (-32767, 32767) scaled version of X as int16, suitable for writing with scipy.io.wavfile """ X = np.array(X, copy=copy) X = (X - X.min()) / (X.max() - X.min()) X = 2 * X - 1 X = gain_scale * X X = X * 2 ** 15 return X.astype('int16') def _wav2array(nchannels, sampwidth, data): # wavio.py # Author: Warren Weckesser # License: BSD 3-Clause (http://opensource.org/licenses/BSD-3-Clause) """data must be the string containing the bytes from the wav file.""" num_samples, remainder = divmod(len(data), sampwidth * nchannels) if remainder > 0: raise ValueError('The length of data is not a multiple of ' 'sampwidth * num_channels.') if sampwidth > 4: raise ValueError("sampwidth must not be greater than 4.") if sampwidth == 3: a = np.empty((num_samples, nchannels, 4), dtype=np.uint8) raw_bytes = np.fromstring(data, dtype=np.uint8) a[:, :, :sampwidth] = raw_bytes.reshape(-1, nchannels, sampwidth) a[:, :, sampwidth:] = (a[:, :, sampwidth - 1:sampwidth] >> 7) * 255 result = a.view(' 0 weights = np.zeros((n_filts, n_fft)) fft_freqs = np.arange(n_fft // 2) / n_fft * fs min_mel = herz_to_mel(min_freq) max_mel = herz_to_mel(max_freq) partial = np.arange(n_filts + 2) / (n_filts + 1.) * (max_mel - min_mel) bin_freqs = mel_to_herz(min_mel + partial) bin_bin = np.round(bin_freqs / fs * (n_fft - 1)) for i in range(n_filts): fs_i = bin_freqs[i + np.arange(3)] fs_i = fs_i[1] + width * (fs_i - fs_i[1]) lo_slope = (fft_freqs - fs_i[0]) / float(fs_i[1] - fs_i[0]) hi_slope = (fs_i[2] - fft_freqs) / float(fs_i[2] - fs_i[1]) weights[i, :n_fft // 2] = np.maximum( 0, np.minimum(lo_slope, hi_slope)) # Constant amplitude multiplier weights = np.diag(2. / (bin_freqs[2:n_filts + 2] - bin_freqs[:n_filts])).dot(weights) weights[:, n_fft // 2:] = 0 return weights def time_attack_agc(X, fs, t_scale=0.5, f_scale=1.): """ AGC based on code by Dan Ellis http://labrosa.ee.columbia.edu/matlab/tf_agc/ """ # 32 ms grid for FFT n_fft = 2 ** int(np.log(0.032 * fs) / np.log(2)) f_scale = float(f_scale) window_size = n_fft window_step = window_size // 2 X_freq = stft(X, window_size, mean_normalize=False) fft_fs = fs / window_step n_bands = max(10, 20 / f_scale) mel_width = f_scale * n_bands / 10. f_to_a = mel_freq_weights(n_fft, fs, n_bands, mel_width) f_to_a = f_to_a[:, :n_fft // 2 + 1] audiogram = np.abs(X_freq).dot(f_to_a.T) fbg = np.zeros_like(audiogram) state = np.zeros((audiogram.shape[1],)) alpha = np.exp(-(1. / fft_fs) / t_scale) for i in range(len(audiogram)): state = np.maximum(alpha * state, audiogram[i]) fbg[i] = state sf_to_a = np.sum(f_to_a, axis=0) E = np.diag(1. / (sf_to_a + (sf_to_a == 0))) E = E.dot(f_to_a.T) E = fbg.dot(E.T) E[E <= 0] = np.min(E[E > 0]) ts = istft(X_freq / E, window_size, mean_normalize=False) return ts, X_freq, E def hebbian_kmeans(X, n_clusters=10, n_epochs=10, W=None, learning_rate=0.01, batch_size=100, random_state=None, verbose=True): """ Modified from existing code from R. Memisevic See http://www.cs.toronto.edu/~rfm/code/hebbian_kmeans.py """ if W is None: if random_state is None: random_state = np.random.RandomState() W = 0.1 * random_state.randn(n_clusters, X.shape[1]) else: assert n_clusters == W.shape[0] X2 = (X ** 2).sum(axis=1, keepdims=True) last_print = 0 for e in range(n_epochs): for i in range(0, X.shape[0], batch_size): X_i = X[i: i + batch_size] X2_i = X2[i: i + batch_size] D = -2 * np.dot(W, X_i.T) D += (W ** 2).sum(axis=1, keepdims=True) D += X2_i.T S = (D == D.min(axis=0)[None, :]).astype("float").T W += learning_rate * ( np.dot(S.T, X_i) - S.sum(axis=0)[:, None] * W) if verbose: if e == 0 or e > (.05 * n_epochs + last_print): last_print = e print("Epoch %i of %i, cost %.4f" % ( e + 1, n_epochs, D.min(axis=0).sum())) return W def complex_to_real_view(arr_c): # Inplace view from complex to r, i as separate columns assert arr_c.dtype in [np.complex64, np.complex128] shp = arr_c.shape dtype = np.float64 if arr_c.dtype == np.complex128 else np.float32 arr_r = arr_c.ravel().view(dtype=dtype).reshape(shp[0], 2 * shp[1]) return arr_r def real_to_complex_view(arr_r): # Inplace view from real, image as columns to complex assert arr_r.dtype not in [np.complex64, np.complex128] shp = arr_r.shape dtype = np.complex128 if arr_r.dtype == np.float64 else np.complex64 arr_c = arr_r.ravel().view(dtype=dtype).reshape(shp[0], shp[1] // 2) return arr_c def complex_to_abs(arr_c): return np.abs(arr_c) def complex_to_angle(arr_c): return np.angle(arr_c) def abs_and_angle_to_complex(arr_abs, arr_angle): # abs(f_c2 - f_c) < 1E-15 return arr_abs * np.exp(1j * arr_angle) def angle_to_sin_cos(arr_angle): return np.hstack((np.sin(arr_angle), np.cos(arr_angle))) def sin_cos_to_angle(arr_sin, arr_cos): return np.arctan2(arr_sin, arr_cos) def polyphase_core(x, m, f): # x = input data # m = decimation rate # f = filter # Hack job - append zeros to match decimation rate if x.shape[0] % m != 0: x = np.append(x, np.zeros((m - x.shape[0] % m,))) if f.shape[0] % m != 0: f = np.append(f, np.zeros((m - f.shape[0] % m,))) polyphase = p = np.zeros((m, (x.shape[0] + f.shape[0]) / m), dtype=x.dtype) p[0, :-1] = np.convolve(x[::m], f[::m]) # Invert the x values when applying filters for i in range(1, m): p[i, 1:] = np.convolve(x[m - i::m], f[i::m]) return p def polyphase_single_filter(x, m, f): return np.sum(polyphase_core(x, m, f), axis=0) def polyphase_lowpass(arr, downsample=2, n_taps=50, filter_pad=1.1): filt = firwin(downsample * n_taps, 1 / (downsample * filter_pad)) filtered = polyphase_single_filter(arr, downsample, filt) return filtered def window(arr, window_size, window_step=1, axis=0): """ Directly taken from Erik Rigtorp's post to numpy-discussion. """ if window_size < 1: raise ValueError("`window_size` must be at least 1.") if window_size > arr.shape[-1]: raise ValueError("`window_size` is too long.") orig = list(range(len(arr.shape))) trans = list(range(len(arr.shape))) trans[axis] = orig[-1] trans[-1] = orig[axis] arr = arr.transpose(trans) shape = arr.shape[:-1] + (arr.shape[-1] - window_size + 1, window_size) strides = arr.strides + (arr.strides[-1],) strided = as_strided(arr, shape=shape, strides=strides) if window_step > 1: strided = strided[..., ::window_step, :] orig = list(range(len(strided.shape))) trans = list(range(len(strided.shape))) trans[-2] = orig[-1] trans[-1] = orig[-2] trans = trans[::-1] strided = strided.transpose(trans) return strided def unwindow(arr, window_size, window_step=1, axis=0): # undo windows by broadcast if axis != 0: raise ValueError("axis != 0 currently unsupported") shp = arr.shape unwindowed = np.tile(arr[:, None, ...], (1, window_step, 1, 1)) unwindowed = unwindowed.reshape(shp[0] * window_step, *shp[1:]) return unwindowed.mean(axis=1) def xcorr_offset(x1, x2): """ Under MSR-LA License Based on MATLAB implementation from Spectrogram Inversion Toolbox References ---------- D. Griffin and J. Lim. Signal estimation from modified short-time Fourier transform. IEEE Trans. Acoust. Speech Signal Process., 32(2):236-243, 1984. Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory Model Inversion for Sound Separation. Proc. IEEE-ICASSP, Adelaide, 1994, II.77-80. Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal Estimation from Modified Short-Time Fourier Transform Magnitude Spectra. IEEE Transactions on Audio Speech and Language Processing, 08/2007. """ x1 = x1 - x1.mean() x2 = x2 - x2.mean() frame_size = len(x2) half = frame_size // 2 corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32')) corrs[:half] = -1E30 corrs[-half:] = -1E30 offset = corrs.argmax() - len(x1) return offset def invert_spectrogram(X_s, step, calculate_offset=True, set_zero_phase=True): """ Under MSR-LA License Based on MATLAB implementation from Spectrogram Inversion Toolbox References ---------- D. Griffin and J. Lim. Signal estimation from modified short-time Fourier transform. IEEE Trans. Acoust. Speech Signal Process., 32(2):236-243, 1984. Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory Model Inversion for Sound Separation. Proc. IEEE-ICASSP, Adelaide, 1994, II.77-80. Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal Estimation from Modified Short-Time Fourier Transform Magnitude Spectra. IEEE Transactions on Audio Speech and Language Processing, 08/2007. """ size = int(X_s.shape[1] // 2) wave = np.zeros((X_s.shape[0] * step + size)) # Getting overflow warnings with 32 bit... wave = wave.astype('float64') total_windowing_sum = np.zeros((X_s.shape[0] * step + size)) win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) est_start = int(size // 2) - 1 est_end = est_start + size for i in range(X_s.shape[0]): wave_start = int(step * i) wave_end = wave_start + size if set_zero_phase: spectral_slice = X_s[i].real + 0j else: # already complex spectral_slice = X_s[i] # Don't need fftshift due to different impl. wave_est = np.real(np.fft.ifft(spectral_slice))[::-1] if calculate_offset and i > 0: offset_size = size - step if offset_size <= 0: print("WARNING: Large step size >50\% detected! " "This code works best with high overlap - try " "with 75% or greater") offset_size = step offset = xcorr_offset(wave[wave_start:wave_start + offset_size], wave_est[est_start:est_start + offset_size]) else: offset = 0 wave[wave_start:wave_end] += win * wave_est[ est_start - offset:est_end - offset] total_windowing_sum[wave_start:wave_end] += win wave = np.real(wave) / (total_windowing_sum + 1E-6) return wave def iterate_invert_spectrogram(X_s, fftsize, step, n_iter=10, verbose=False, complex_input=False): """ Under MSR-LA License Based on MATLAB implementation from Spectrogram Inversion Toolbox References ---------- D. Griffin and J. Lim. Signal estimation from modified short-time Fourier transform. IEEE Trans. Acoust. Speech Signal Process., 32(2):236-243, 1984. Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory Model Inversion for Sound Separation. Proc. IEEE-ICASSP, Adelaide, 1994, II.77-80. Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal Estimation from Modified Short-Time Fourier Transform Magnitude Spectra. IEEE Transactions on Audio Speech and Language Processing, 08/2007. """ reg = np.max(X_s) / 1E8 X_best = copy.deepcopy(X_s) try: for i in range(n_iter): if verbose: print("Runnning iter %i" % i) if i == 0 and not complex_input: X_t = invert_spectrogram(X_best, step, calculate_offset=True, set_zero_phase=True) else: # Calculate offset was False in the MATLAB version # but in mine it massively improves the result # Possible bug in my impl? X_t = invert_spectrogram(X_best, step, calculate_offset=True, set_zero_phase=False) est = stft(X_t, fftsize=fftsize, step=step, compute_onesided=False) phase = est / np.maximum(reg, np.abs(est)) phase = phase[:len(X_s)] X_s = X_s[:len(phase)] X_best = X_s * phase except ValueError: raise ValueError("The iterate_invert_spectrogram algorithm requires" " stft(..., compute_onesided=False),", " be sure you have calculated stft with this argument") X_t = invert_spectrogram(X_best, step, calculate_offset=True, set_zero_phase=False) return np.real(X_t) def implot(arr, scale=None, title="", cmap="gray"): import matplotlib.pyplot as plt if scale is "specgram": # plotting part mag = 20. * np.log10(np.abs(arr)) # Transpose so time is X axis, and invert y axis so # frequency is low at bottom mag = mag.T[::-1, :] else: mag = arr f, ax = plt.subplots() ax.matshow(mag, cmap=cmap) plt.axis("off") x1 = mag.shape[0] y1 = mag.shape[1] def autoaspect(x_range, y_range): """ The aspect to make a plot square with ax.set_aspect in Matplotlib """ mx = max(x_range, y_range) mn = min(x_range, y_range) if x_range <= y_range: return mx / float(mn) else: return mn / float(mx) asp = autoaspect(x1, y1) ax.set_aspect(asp) plt.title(title) def test_lpc_to_lsf(): # Matlab style vectors for testing # lsf = [0.7842 1.5605 1.8776 1.8984 2.3593] # a = [1.0000 0.6149 0.9899 0.0000 0.0031 -0.0082]; lsf = [[0.7842, 1.5605, 1.8776, 1.8984, 2.3593], [0.7842, 1.5605, 1.8776, 1.8984, 2.3593]] a = [[1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082], [1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082]] a = np.array(a) lsf = np.array(lsf) lsf_r = lpc_to_lsf(a) assert_almost_equal(lsf, lsf_r, decimal=4) a_r = lsf_to_lpc(lsf) assert_almost_equal(a, a_r, decimal=4) lsf_r = lpc_to_lsf(a[0]) assert_almost_equal(lsf[0], lsf_r, decimal=4) a_r = lsf_to_lpc(lsf[0]) assert_almost_equal(a[0], a_r, decimal=4) def test_lpc_analysis_truncate(): # Test that truncate doesn't crash and actually truncates [a, g, e] = lpc_analysis(np.random.randn(85), order=8, window_step=80, window_size=80, emphasis=0.9, truncate=True) assert(a.shape[0] == 1) def test_feature_build(): samplerate, X = fetch_sample_music() # MATLAB wavread does normalization X = X.astype('float32') / (2 ** 15) wsz = 256 wst = 128 a, g, e = lpc_analysis(X, order=8, window_step=wst, window_size=wsz, emphasis=0.9, copy=True) v, p = voiced_unvoiced(X, window_size=wsz, window_step=wst) c = compress(e, n_components=64) # First component of a is always 1 combined = np.hstack((a[:, 1:], g, c[:a.shape[0]])) features = np.zeros((a.shape[0], 2 * combined.shape[1])) start_indices = v * combined.shape[1] start_indices = start_indices.astype('int32') end_indices = (v + 1) * combined.shape[1] end_indices = end_indices.astype('int32') for i in range(features.shape[0]): features[i, start_indices[i]:end_indices[i]] = combined[i] def test_mdct_and_inverse(): fs, X = fetch_sample_music() X_dct = mdct_slow(X) X_r = imdct_slow(X_dct) assert np.all(np.abs(X_r[:len(X)] - X) < 1E-3) assert np.abs(X_r[:len(X)] - X).mean() < 1E-6 def test_all(): test_lpc_analysis_truncate() test_feature_build() test_lpc_to_lsf() test_mdct_and_inverse() def run_lpc_example(): # ae.wav is from # http://www.linguistics.ucla.edu/people/hayes/103/Charts/VChart/ae.wav # Partially following the formant tutorial here # http://www.mathworks.com/help/signal/ug/formant-estimation-with-lpc-coefficients.html samplerate, X = fetch_sample_music() c = overlap_compress(X, 200, 400) X_r = overlap_uncompress(c, 400) wavfile.write('lpc_uncompress.wav', samplerate, soundsc(X_r)) print("Calculating sinusoids") f_hz, m = sinusoid_analysis(X, input_sample_rate=16000) Xs_sine = sinusoid_synthesis(f_hz, m) orig_fname = 'lpc_orig.wav' sine_fname = 'lpc_sine_synth.wav' wavfile.write(orig_fname, samplerate, soundsc(X)) wavfile.write(sine_fname, samplerate, soundsc(Xs_sine)) lpc_order_list = [8, ] dct_components_list = [200, ] window_size_list = [400, ] # Seems like a dct component size of ~2/3rds the step # (1/3rd the window for 50% overlap) works well. for lpc_order in lpc_order_list: for dct_components in dct_components_list: for window_size in window_size_list: # 50% overlap window_step = window_size // 2 a, g, e = lpc_analysis(X, order=lpc_order, window_step=window_step, window_size=window_size, emphasis=0.9, copy=True) print("Calculating LSF") lsf = lpc_to_lsf(a) # Not window_size - window_step! Need to implement overlap print("Calculating compression") c = compress(e, n_components=dct_components, window_size=window_step) co = overlap_compress(e, n_components=dct_components, window_size=window_step) block_excitation = uncompress(c, window_size=window_step) overlap_excitation = overlap_uncompress(co, window_size=window_step) a_r = lsf_to_lpc(lsf) f, m = lpc_to_frequency(a_r, g) block_lpc = lpc_synthesis(a_r, g, block_excitation, emphasis=0.9, window_step=window_step) overlap_lpc = lpc_synthesis(a_r, g, overlap_excitation, emphasis=0.9, window_step=window_step) v, p = voiced_unvoiced(X, window_size=window_size, window_step=window_step) noisy_lpc = lpc_synthesis(a_r, g, voiced_frames=v, emphasis=0.9, window_step=window_step) if dct_components is None: dct_components = window_size noisy_fname = 'lpc_noisy_synth_%iwin_%ilpc_%idct.wav' % ( window_size, lpc_order, dct_components) block_fname = 'lpc_block_synth_%iwin_%ilpc_%idct.wav' % ( window_size, lpc_order, dct_components) overlap_fname = 'lpc_overlap_synth_%iwin_%ilpc_%idct.wav' % ( window_size, lpc_order, dct_components) wavfile.write(noisy_fname, samplerate, soundsc(noisy_lpc)) wavfile.write(block_fname, samplerate, soundsc(block_lpc)) wavfile.write(overlap_fname, samplerate, soundsc(overlap_lpc)) def run_fft_vq_example(): n_fft = 512 time_smoothing = 4 def _pre(list_of_data): f_c = np.vstack([stft(dd, n_fft) for dd in list_of_data]) if len(f_c) % time_smoothing != 0: newlen = len(f_c) - len(f_c) % time_smoothing f_c = f_c[:newlen] f_mag = complex_to_abs(f_c) f_phs = complex_to_angle(f_c) f_sincos = angle_to_sin_cos(f_phs) f_r = np.hstack((f_mag, f_sincos)) f_r = f_r.reshape((len(f_r) // time_smoothing, time_smoothing * f_r.shape[1])) return f_r, n_fft def preprocess_train(list_of_data, random_state): f_r, n_fft = _pre(list_of_data) clusters = f_r return clusters def apply_preprocess(list_of_data, clusters): f_r, n_fft = _pre(list_of_data) memberships, distances = vq(f_r, clusters) vq_r = clusters[memberships] vq_r = vq_r.reshape((time_smoothing * len(vq_r), vq_r.shape[1] // time_smoothing)) f_mag = vq_r[:, :n_fft // 2 + 1] f_sincos = vq_r[:, n_fft // 2 + 1:] extent = f_sincos.shape[1] // 2 f_phs = sin_cos_to_angle(f_sincos[:, :extent], f_sincos[:, extent:]) vq_c = abs_and_angle_to_complex(f_mag, f_phs) d_k = istft(vq_c, fftsize=n_fft) return d_k random_state = np.random.RandomState(1999) """ fs, d = fetch_sample_music() sub = int(.8 * d.shape[0]) d1 = [d[:sub]] d2 = [d[sub:]] """ fs, d = fetch_sample_speech_fruit() d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8] d2 = d[7::8] # make sure d1 and d2 aren't the same! assert [len(di) for di in d1] != [len(di) for di in d2] clusters = preprocess_train(d1, random_state) # Training data vq_d1 = apply_preprocess(d1, clusters) vq_d2 = apply_preprocess(d2, clusters) assert [i != j for i, j in zip(vq_d1.ravel(), vq_d2.ravel())] fix_d1 = np.concatenate(d1) fix_d2 = np.concatenate(d2) wavfile.write("fft_train_no_agc.wav", fs, soundsc(fix_d1)) wavfile.write("fft_test_no_agc.wav", fs, soundsc(fix_d2)) wavfile.write("fft_vq_train_no_agc.wav", fs, soundsc(vq_d1, fs)) wavfile.write("fft_vq_test_no_agc.wav", fs, soundsc(vq_d2, fs)) agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5) agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) wavfile.write("fft_train_agc.wav", fs, soundsc(agc_d1)) wavfile.write("fft_test_agc.wav", fs, soundsc(agc_d2)) wavfile.write("fft_vq_train_agc.wav", fs, soundsc(agc_vq_d1, fs)) wavfile.write("fft_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) def run_dct_vq_example(): def _pre(list_of_data): # Temporal window setting is crucial! - 512 seems OK for music, 256 # fruit perhaps due to samplerates n_dct = 512 f_r = np.vstack([mdct_slow(dd, n_dct) for dd in list_of_data]) return f_r, n_dct def preprocess_train(list_of_data, random_state): f_r, n_dct = _pre(list_of_data) clusters = f_r return clusters def apply_preprocess(list_of_data, clusters): f_r, n_dct = _pre(list_of_data) f_clust = f_r memberships, distances = vq(f_clust, clusters) vq_r = clusters[memberships] d_k = imdct_slow(vq_r, n_dct) return d_k random_state = np.random.RandomState(1999) # This doesn't work very well due to only taking a sample from the end as # test fs, d = fetch_sample_music() sub = int(.8 * d.shape[0]) d1 = [d[:sub]] d2 = [d[sub:]] """ fs, d = fetch_sample_speech_fruit() d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8] d2 = d[7::8] # make sure d1 and d2 aren't the same! assert [len(di) for di in d1] != [len(di) for di in d2] """ clusters = preprocess_train(d1, random_state) # Training data vq_d1 = apply_preprocess(d1, clusters) vq_d2 = apply_preprocess(d2, clusters) assert [i != j for i, j in zip(vq_d2.ravel(), vq_d2.ravel())] fix_d1 = np.concatenate(d1) fix_d2 = np.concatenate(d2) wavfile.write("dct_train_no_agc.wav", fs, soundsc(fix_d1)) wavfile.write("dct_test_no_agc.wav", fs, soundsc(fix_d2)) wavfile.write("dct_vq_train_no_agc.wav", fs, soundsc(vq_d1)) wavfile.write("dct_vq_test_no_agc.wav", fs, soundsc(vq_d2)) """ import matplotlib.pyplot as plt plt.specgram(vq_d2, cmap="gray") plt.figure() plt.specgram(fix_d2, cmap="gray") plt.show() """ agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5) agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) wavfile.write("dct_train_agc.wav", fs, soundsc(agc_d1)) wavfile.write("dct_test_agc.wav", fs, soundsc(agc_d2)) wavfile.write("dct_vq_train_agc.wav", fs, soundsc(agc_vq_d1)) wavfile.write("dct_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) def run_phase_reconstruction_example(): fs, d = fetch_sample_speech_tapestry() # actually gives however many components you say! So double what .m file # says fftsize = 512 step = 64 X_s = np.abs(stft(d, fftsize=fftsize, step=step, real=False, compute_onesided=False)) X_t = iterate_invert_spectrogram(X_s, fftsize, step, verbose=True) """ import matplotlib.pyplot as plt plt.specgram(d, cmap="gray") plt.savefig("1.png") plt.close() plt.imshow(X_s, cmap="gray") plt.savefig("2.png") plt.close() """ wavfile.write("phase_original.wav", fs, soundsc(d)) wavfile.write("phase_reconstruction.wav", fs, soundsc(X_t)) def run_phase_vq_example(): def _pre(list_of_data): # Temporal window setting is crucial! - 512 seems OK for music, 256 # fruit perhaps due to samplerates n_fft = 256 step = 32 f_r = np.vstack([np.abs(stft(dd, n_fft, step=step, real=False, compute_onesided=False)) for dd in list_of_data]) return f_r, n_fft, step def preprocess_train(list_of_data, random_state): f_r, n_fft, step = _pre(list_of_data) clusters = copy.deepcopy(f_r) return clusters def apply_preprocess(list_of_data, clusters): f_r, n_fft, step = _pre(list_of_data) f_clust = f_r # Nondeterministic ? memberships, distances = vq(f_clust, clusters) vq_r = clusters[memberships] d_k = iterate_invert_spectrogram(vq_r, n_fft, step, verbose=True) return d_k random_state = np.random.RandomState(1999) fs, d = fetch_sample_speech_fruit() d1 = d[::9] d2 = d[7::8][:5] # make sure d1 and d2 aren't the same! assert [len(di) for di in d1] != [len(di) for di in d2] clusters = preprocess_train(d1, random_state) fix_d1 = np.concatenate(d1) fix_d2 = np.concatenate(d2) vq_d2 = apply_preprocess(d2, clusters) wavfile.write("phase_train_no_agc.wav", fs, soundsc(fix_d1)) wavfile.write("phase_vq_test_no_agc.wav", fs, soundsc(vq_d2)) agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) """ import matplotlib.pyplot as plt plt.specgram(agc_vq_d2, cmap="gray") #plt.title("Fake") plt.figure() plt.specgram(agc_d2, cmap="gray") #plt.title("Real") plt.show() """ wavfile.write("phase_train_agc.wav", fs, soundsc(agc_d1)) wavfile.write("phase_test_agc.wav", fs, soundsc(agc_d2)) wavfile.write("phase_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) def run_cqt_example(): try: fs, d = fetch_sample_file("/Users/User/cqt_resources/kempff1.wav") except ValueError: print("WARNING: Using sample music instead byt kempff1.wav is the example") fs, d = fetch_sample_music() X = d[:44100] X_cq, c_dc, c_nyq, multiscale, shift, window_lens = cqt(X, fs) X_r = icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens) SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) wavfile.write("cqt_original.wav", fs, soundsc(X)) wavfile.write("cqt_reconstruction.wav", fs, soundsc(X_r)) def run_fft_dct_example(): random_state = np.random.RandomState(1999) fs, d = fetch_sample_speech_fruit() n_fft = 64 X = d[0] X_stft = stft(X, n_fft) X_rr = complex_to_real_view(X_stft) X_dct = fftpack.dct(X_rr, axis=-1, norm='ortho') X_dct_sub = X_dct[1:] - X_dct[:-1] std = X_dct_sub.std(axis=0, keepdims=True) X_dct_sub += .01 * std * random_state.randn( X_dct_sub.shape[0], X_dct_sub.shape[1]) X_dct_unsub = np.cumsum(X_dct_sub, axis=0) X_idct = fftpack.idct(X_dct_unsub, axis=-1, norm='ortho') X_irr = real_to_complex_view(X_idct) X_r = istft(X_irr, n_fft)[:len(X)] SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) print(SNR) wavfile.write("fftdct_orig.wav", fs, soundsc(X)) wavfile.write("fftdct_rec.wav", fs, soundsc(X_r)) def harvest_get_downsampled_signal(x, fs, target_fs): decimation_ratio = np.round(fs / target_fs) offset = np.ceil(140. / decimation_ratio) * decimation_ratio start_pad = x[0] * np.ones(int(offset), dtype=np.float32) end_pad = x[-1] * np.ones(int(offset), dtype=np.float32) x = np.concatenate((start_pad, x, end_pad), axis=0) if fs < target_fs: raise ValueError("CASE NOT HANDLED IN harvest_get_downsampled_signal") else: y0 = sg.decimate(x, int(decimation_ratio), 3, zero_phase=True) actual_fs = fs / decimation_ratio y = y0[int(offset / decimation_ratio):-int(offset / decimation_ratio)] y = y - np.mean(y) return y, actual_fs def harvest_get_raw_f0_candidates(number_of_frames, boundary_f0_list, y_length, temporal_positions, actual_fs, y_spectrum, f0_floor, f0_ceil): raw_f0_candidates = np.zeros((len(boundary_f0_list), number_of_frames), dtype=np.float32) for i in range(len(boundary_f0_list)): raw_f0_candidates[i, :] = harvest_get_f0_candidate_from_raw_event( boundary_f0_list[i], actual_fs, y_spectrum, y_length, temporal_positions, f0_floor, f0_ceil) return raw_f0_candidates def harvest_nuttall(N): t = np.arange(0, N) * 2 * np.pi / (N - 1) coefs = np.array([0.355768, -0.487396, 0.144232, -0.012604]) window = np.cos(t[:, None].dot(np.array([0., 1., 2., 3.])[None])).dot( coefs[:, None]) # 1D window... return window.ravel() def harvest_get_f0_candidate_from_raw_event(boundary_f0, fs, y_spectrum, y_length, temporal_positions, f0_floor, f0_ceil): filter_length_half = int(np.round(fs / boundary_f0 * 2)) band_pass_filter_base = harvest_nuttall(filter_length_half * 2 + 1) shifter = np.cos(2 * np.pi * boundary_f0 * np.arange(-filter_length_half, filter_length_half + 1) / float(fs)) band_pass_filter = band_pass_filter_base * shifter index_bias = filter_length_half # numerical issues if 32 bit? spectrum_low_pass_filter = np.fft.fft(band_pass_filter.astype("float64"), len(y_spectrum)) filtered_signal = np.real(np.fft.ifft(spectrum_low_pass_filter * y_spectrum)) index_bias = filter_length_half + 1 filtered_signal = filtered_signal[index_bias + np.arange(y_length).astype("int32")] negative_zero_cross = harvest_zero_crossing_engine(filtered_signal, fs) positive_zero_cross = harvest_zero_crossing_engine(-filtered_signal, fs) d_filtered_signal = filtered_signal[1:] - filtered_signal[:-1] peak = harvest_zero_crossing_engine(d_filtered_signal, fs) dip = harvest_zero_crossing_engine(-d_filtered_signal, fs) f0_candidate = harvest_get_f0_candidate_contour(negative_zero_cross, positive_zero_cross, peak, dip, temporal_positions) from IPython import embed; embed() raise ValueError() def harvest_get_f0_candidate_contour(negative_zero_cross_tup, positive_zero_cross_tup, peak_tup, dip_tup, temporal_positions): # 0 is inteval locations # 1 is interval based f0 usable_channel = max(0, len(negative_zero_cross_tup[0]) - 2) usable_channel *= max(0, len(positive_zero_cross_tup[0]) - 2) usable_channel *= max(0, len(peak_tup[0]) - 2) usable_channel *= max(0, len(dip_tup[0]) - 2) if usable_channel > 0: interpolated_f0_list = np.zeros((4, len(temporal_positions))) nz = interp1d(negative_zero_cross_tup[0], negative_zero_cross_tup[1], kind="linear", bounds_error=False, fill_value="extrapolate") pz = interp1d(positive_zero_cross_tup[0], positive_zero_cross_tup[1], kind="linear", bounds_error=False, fill_value="extrapolate") pkz = interp1d(peak_tup[0], peak_tup[1], kind="linear", bounds_error=False, fill_value="extrapolate") dz = interp1d(dip_tup[0], dip_tup[1], kind="linear", bounds_error=False, fill_value="extrapolate") interpolated_f0_list[0, :] = nz(temporal_positions) interpolated_f0_list[1, :] = pz(temporal_positions) interpolated_f0_list[2, :] = pkz(temporal_positions) interpolated_f0_list[3, :] = dz(temporal_positions) f0_candidate = np.mean(interpolated_f0_list, axis=0) else: f0_candidate = temporal_positions * 0 return f0_candidate def harvest_zero_crossing_engine(x, fs, debug=False): # negative zero crossing, going from positive to negative x_shift = x.copy() x_shift[:-1] = x_shift[1:] x_shift[-1] = x[-1] # +1 here to avoid edge case at 0 points = np.arange(len(x)) + 1 negative_going_points = points * ((x_shift * x < 0) * (x_shift < x)) edge_list = negative_going_points[negative_going_points > 0] # -1 to correct index fine_edge_list = edge_list - x[edge_list - 1] / (x[edge_list] - x[edge_list - 1]).astype("float32") interval_locations = (fine_edge_list[:-1] + fine_edge_list[1:]) / float(2) / fs interval_based_f0 = float(fs) / (fine_edge_list[1:] - fine_edge_list[:-1]) return interval_locations, interval_based_f0 import matplotlib.pyplot as plt def harvest(x, fs): f0_floor = 71 f0_ceil = 800 frame_period = 5 basic_frame_period = 1 target_fs = 8000 basic_temporal_positions = np.arange(0, len(x) / float(fs), basic_frame_period / float(1000)) channels_in_octave = 40. adjusted_f0_floor = f0_floor * 0.9 adjusted_f0_ceil = f0_ceil * 1.1 boundary_f0_list = np.arange(1, np.ceil(np.log2(adjusted_f0_ceil / adjusted_f0_floor) * channels_in_octave) + 1) / float(channels_in_octave) boundary_f0_list = adjusted_f0_floor * 2.0 ** boundary_f0_list y, actual_fs = harvest_get_downsampled_signal(x, fs, target_fs) fft_size = 2. ** np.ceil(np.log2(len(y) + np.round(fs / f0_floor * 4) + 1)) y_spectrum = np.fft.fft(y, 8192) raw_f0_candidates = harvest_get_raw_f0_candidates( len(basic_temporal_positions), boundary_f0_list, len(y), basic_temporal_positions, actual_fs, y_spectrum, f0_floor, f0_ceil) # raw_f0_candidates = GetRawF0Candidates(... # length(basic_temporal_positions), boundary_f0_list, length(y),... # basic_temporal_positions, actual_fs, y_spectrum, f0_floor, f0_ceil); from IPython import embed; embed() raise ValueError() def run_world_example(): fs, d = wavfile.read("vaiueo2d.wav") d = d.astype("float32") / 2 ** 15 harvest(d, fs) if __name__ == "__main__": run_world_example() """ Trying to run all examples will seg fault on my laptop - probably memory! Comment individually run_phase_reconstruction_example() run_phase_vq_example() run_dct_vq_example() run_fft_vq_example() run_lpc_example() run_cqt_example() run_fft_dct_example() test_all() """