-
-
Save gvanslyk/3c804205f6a0a3e8fddd2d01bdd8a2eb to your computer and use it in GitHub Desktop.
Audio tools for numpy/python. Constant work in progress.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # License: BSD 3-clause | |
| # Authors: Kyle Kastner | |
| import numpy as np | |
| # Do numpy version check for <= 1.9 ! Something crazy going on in 1.10 | |
| if int(np.__version__.split(".")[1]) >= 10: | |
| print("Only numpy <= 1.9 currently supported! There is a " | |
| "numerical error in one of the numpy 1.10 routines. " | |
| "Hopefully this will be debugged an corrected soon. " | |
| "For the intrepid, the error can be seen by running" | |
| "run_phase_reconstruction()") | |
| from numpy.lib.stride_tricks import as_strided | |
| import scipy.signal as sg | |
| from scipy.cluster.vq import vq | |
| from scipy import linalg, fftpack | |
| from numpy.testing import assert_almost_equal | |
| from scipy.linalg import svd | |
| from scipy.io import wavfile | |
| from scipy.signal import firwin | |
| import zipfile | |
| import tarfile | |
| import os | |
| import copy | |
| try: | |
| import urllib.request as urllib # for backwards compatibility | |
| except ImportError: | |
| import urllib2 as urllib | |
| def download(url, server_fname, local_fname=None, progress_update_percentage=5, | |
| bypass_certificate_check=False): | |
| """ | |
| An internet download utility modified from | |
| http://stackoverflow.com/questions/22676/ | |
| how-do-i-download-a-file-over-http-using-python/22776#22776 | |
| """ | |
| if bypass_certificate_check: | |
| import ssl | |
| ctx = ssl.create_default_context() | |
| ctx.check_hostname = False | |
| ctx.verify_mode = ssl.CERT_NONE | |
| u = urllib.urlopen(url, context=ctx) | |
| else: | |
| u = urllib.urlopen(url) | |
| if local_fname is None: | |
| local_fname = server_fname | |
| full_path = local_fname | |
| meta = u.info() | |
| with open(full_path, 'wb') as f: | |
| try: | |
| file_size = int(meta.get("Content-Length")) | |
| except TypeError: | |
| print("WARNING: Cannot get file size, displaying bytes instead!") | |
| file_size = 100 | |
| print("Downloading: %s Bytes: %s" % (server_fname, file_size)) | |
| file_size_dl = 0 | |
| block_sz = int(1E7) | |
| p = 0 | |
| while True: | |
| buffer = u.read(block_sz) | |
| if not buffer: | |
| break | |
| file_size_dl += len(buffer) | |
| f.write(buffer) | |
| if (file_size_dl * 100. / file_size) > p: | |
| status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * | |
| 100. / file_size) | |
| print(status) | |
| p += progress_update_percentage | |
| def fetch_sample_speech_tapestry(): | |
| url = "https://www.dropbox.com/s/qte66a7haqspq2g/tapestry.wav?dl=1" | |
| wav_path = "tapestry.wav" | |
| if not os.path.exists(wav_path): | |
| download(url, wav_path) | |
| fs, d = wavfile.read(wav_path) | |
| d = d.astype('float32') / (2 ** 15) | |
| # file is stereo? - just choose one channel | |
| return fs, d | |
| def fetch_sample_file(wav_path): | |
| if not os.path.exists(wav_path): | |
| raise ValueError("Unable to find file at path %s" % wav_path) | |
| fs, d = wavfile.read(wav_path) | |
| d = d.astype('float32') / (2 ** 15) | |
| # file is stereo - just choose one channel | |
| if len(d.shape) > 1: | |
| d = d[:, 0] | |
| return fs, d | |
| def fetch_sample_music(): | |
| url = "http://www.music.helsinki.fi/tmt/opetus/uusmedia/esim/" | |
| url += "a2002011001-e02-16kHz.wav" | |
| wav_path = "test.wav" | |
| if not os.path.exists(wav_path): | |
| download(url, wav_path) | |
| fs, d = wavfile.read(wav_path) | |
| d = d.astype('float32') / (2 ** 15) | |
| # file is stereo - just choose one channel | |
| d = d[:, 0] | |
| return fs, d | |
| def fetch_sample_speech_fruit(n_samples=None): | |
| url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz' | |
| wav_path = "audio.tar.gz" | |
| if not os.path.exists(wav_path): | |
| download(url, wav_path) | |
| tf = tarfile.open(wav_path) | |
| wav_names = [fname for fname in tf.getnames() | |
| if ".wav" in fname.split(os.sep)[-1]] | |
| speech = [] | |
| print("Loading speech files...") | |
| for wav_name in wav_names[:n_samples]: | |
| f = tf.extractfile(wav_name) | |
| fs, d = wavfile.read(f) | |
| d = d.astype('float32') / (2 ** 15) | |
| speech.append(d) | |
| return fs, speech | |
| def fetch_sample_speech_eustace(n_samples=None): | |
| """ | |
| http://www.cstr.ed.ac.uk/projects/eustace/download.html | |
| """ | |
| # data | |
| url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_wav.zip" | |
| wav_path = "eustace_wav.zip" | |
| if not os.path.exists(wav_path): | |
| download(url, wav_path) | |
| # labels | |
| url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_labels.zip" | |
| labels_path = "eustace_labels.zip" | |
| if not os.path.exists(labels_path): | |
| download(url, labels_path) | |
| # Read wavfiles | |
| # 16 kHz wav | |
| zf = zipfile.ZipFile(wav_path, 'r') | |
| wav_names = [fname for fname in zf.namelist() | |
| if ".wav" in fname.split(os.sep)[-1]] | |
| fs = 16000 | |
| speech = [] | |
| print("Loading speech files...") | |
| for wav_name in wav_names[:n_samples]: | |
| wav_str = zf.read(wav_name) | |
| d = np.frombuffer(wav_str, dtype=np.int16) | |
| d = d.astype('float32') / (2 ** 15) | |
| speech.append(d) | |
| zf = zipfile.ZipFile(labels_path, 'r') | |
| label_names = [fname for fname in zf.namelist() | |
| if ".lab" in fname.split(os.sep)[-1]] | |
| labels = [] | |
| print("Loading label files...") | |
| for label_name in label_names[:n_samples]: | |
| label_file_str = zf.read(label_name) | |
| labels.append(label_file_str) | |
| return fs, speech | |
| def stft(X, fftsize=128, step="half", mean_normalize=True, real=False, | |
| compute_onesided=True): | |
| """ | |
| Compute STFT for 1D real valued input X | |
| """ | |
| if real: | |
| local_fft = np.fft.rfft | |
| cut = -1 | |
| else: | |
| local_fft = np.fft.fft | |
| cut = None | |
| if compute_onesided: | |
| cut = fftsize // 2 | |
| if mean_normalize: | |
| X -= X.mean() | |
| if step == "half": | |
| X = halfoverlap(X, fftsize) | |
| else: | |
| X = overlap(X, fftsize, step) | |
| size = fftsize | |
| win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) | |
| X = X * win[None] | |
| X = local_fft(X)[:, :cut] | |
| return X | |
| def istft(X, fftsize=128, mean_normalize=True, real=False, | |
| compute_onesided=True): | |
| """ | |
| Compute ISTFT for STFT transformed X | |
| """ | |
| if real: | |
| local_ifft = np.fft.irfft | |
| X_pad = np.zeros((X.shape[0], X.shape[1] + 1)) + 0j | |
| X_pad[:, :-1] = X | |
| X = X_pad | |
| else: | |
| local_ifft = np.fft.ifft | |
| if compute_onesided: | |
| X_pad = np.zeros((X.shape[0], 2 * X.shape[1])) + 0j | |
| X_pad[:, :fftsize // 2] = X | |
| X_pad[:, fftsize // 2:] = 0 | |
| X = X_pad | |
| X = local_ifft(X).astype("float64") | |
| X = invert_halfoverlap(X) | |
| if mean_normalize: | |
| X -= np.mean(X) | |
| return X | |
| def mdct_slow(X, dctsize=128): | |
| M = dctsize | |
| N = 2 * dctsize | |
| N_0 = (M + 1) / 2 | |
| X = halfoverlap(X, N) | |
| X = sine_window(X) | |
| n, k = np.meshgrid(np.arange(N), np.arange(M)) | |
| # Use transpose due to "samples as rows" convention | |
| tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M).T | |
| return np.dot(X, tf) | |
| def imdct_slow(X, dctsize=128): | |
| M = dctsize | |
| N = 2 * dctsize | |
| N_0 = (M + 1) / 2 | |
| N_4 = N / 4 | |
| n, k = np.meshgrid(np.arange(N), np.arange(M)) | |
| # inverse *is not* transposed | |
| tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M) | |
| X_r = np.dot(X, tf) / N_4 | |
| X_r = sine_window(X_r) | |
| X = invert_halfoverlap(X_r) | |
| return X | |
| def nsgcwin(fmin, fmax, n_bins, fs, signal_len, gamma): | |
| """ | |
| Nonstationary Gabor window calculation | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| # use a hanning window | |
| # no fractional shifts | |
| fftres = fs / signal_len | |
| fmin = float(fmin) | |
| fmax = float(fmax) | |
| gamma = float(gamma) | |
| nyq = fs / 2. | |
| b = np.floor(n_bins * np.log2(fmax / fmin)) | |
| fbas = fmin * 2 ** (np.arange(b + 1) / float(n_bins)) | |
| Q = 2 ** (1. / n_bins) - 2 ** (-1. / n_bins) | |
| cqtbw = Q * fbas + gamma | |
| cqtbw = cqtbw.ravel() | |
| maxidx = np.where(fbas + cqtbw / 2. > nyq)[0] | |
| if len(maxidx) > 0: | |
| # replicate bug in MATLAB version... | |
| # or is it a feature | |
| if sum(maxidx) == 0: | |
| first = len(cqtbw) - 1 | |
| else: | |
| first = maxidx[0] | |
| fbas = fbas[:first] | |
| cqtbw = cqtbw[:first] | |
| minidx = np.where(fbas - cqtbw / 2. < 0)[0] | |
| if len(minidx) > 0: | |
| fbas = fbas[minidx[-1]+1:] | |
| cqtbw = cqtbw[minidx[-1]+1:] | |
| fbas_len = len(fbas) | |
| fbas_new = np.zeros((2 * (len(fbas) + 1))) | |
| fbas_new[1:len(fbas) + 1] = fbas | |
| fbas = fbas_new | |
| fbas[fbas_len + 1] = nyq | |
| fbas[fbas_len + 2:] = fs - fbas[1:fbas_len + 1][::-1] | |
| bw = np.zeros_like(fbas) | |
| bw[0] = 2 * fmin | |
| bw[1:len(cqtbw) + 1] = cqtbw | |
| bw[len(cqtbw) + 1] = fbas[fbas_len + 2] - fbas[fbas_len] | |
| bw[-len(cqtbw):] = cqtbw[::-1] | |
| bw = bw / fftres | |
| fbas = fbas / fftres | |
| posit = np.zeros_like(fbas) | |
| posit[:fbas_len + 2] = np.floor(fbas[:fbas_len + 2]) | |
| posit[fbas_len + 2:] = np.ceil(fbas[fbas_len + 2:]) | |
| base_shift = -posit[-1] % signal_len | |
| shift = np.zeros_like(posit).astype("int32") | |
| shift[1:] = (posit[1:] - posit[:-1]).astype("int32") | |
| shift[0] = base_shift | |
| bw = np.round(bw) | |
| bwfac = 1 | |
| M = bw | |
| min_win = 4 | |
| for ii in range(len(bw)): | |
| if bw[ii] < min_win: | |
| bw[ii] = min_win | |
| M[ii] = bw[ii] | |
| def _win(numel): | |
| if numel % 2 == 0: | |
| s1 = np.arange(0, .5, 1. / numel) | |
| if len(s1) != numel // 2: | |
| # edge case with small floating point numbers... | |
| s1 = s1[:-1] | |
| s2 = np.arange(-.5, 0, 1. / numel) | |
| if len(s2) != numel // 2: | |
| # edge case with small floating point numbers... | |
| s2 = s2[:-1] | |
| x = np.concatenate((s1, s2)) | |
| else: | |
| s1 = np.arange(0, .5, 1. / numel) | |
| s2 = np.arange(-.5 + .5 / numel, 0, 1. / numel) | |
| if len(s2) != numel // 2: # assume integer truncate 27 // 2 = 13 | |
| s2 = s2[:-1] | |
| x = np.concatenate((s1, s2)) | |
| assert len(x) == numel | |
| g = .5 + .5 * np.cos(2 * np.pi * x) | |
| return g | |
| multiscale = [_win(bi) for bi in bw] | |
| bw = bwfac * np.ceil(M / bwfac) | |
| for kk in [0, fbas_len + 1]: | |
| if M[kk] > M[kk + 1]: | |
| multiscale[kk] = np.ones(M[kk]).astype(multiscale[0].dtype) | |
| i1 = np.floor(M[kk] / 2) - np.floor(M[kk + 1] / 2) | |
| i2 = np.floor(M[kk] / 2) + np.ceil(M[kk + 1] / 2) | |
| # Very rarely, gets an off by 1 error? Seems to be at the end... | |
| # for now, slice | |
| multiscale[kk][i1:i2] = _win(M[kk + 1]) | |
| multiscale[kk] = multiscale[kk] / np.sqrt(M[kk]) | |
| return multiscale, shift, M | |
| def nsgtf_real(X, multiscale, shift, window_lens): | |
| """ | |
| Nonstationary Gabor Transform for real values | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| # This will break with multchannel input | |
| signal_len = len(X) | |
| N = len(shift) | |
| X_fft = np.fft.fft(X) | |
| fill = np.sum(shift) - signal_len | |
| if fill > 0: | |
| X_fft_tmp = np.zeros((signal_len + shift)) | |
| X_fft_tmp[:len(X_fft)] = X_fft | |
| X_fft = X_fft_tmp | |
| posit = np.cumsum(shift) - shift[0] | |
| scale_lens = np.array([len(m) for m in multiscale]) | |
| N = np.where(posit - np.floor(scale_lens) <= (signal_len + fill) / 2)[0][-1] | |
| c = [] | |
| # c[0] is almost exact | |
| for ii in range(N): | |
| idx_l = np.arange(np.ceil(scale_lens[ii] / 2), scale_lens[ii]) | |
| idx_r = np.arange(np.ceil(scale_lens[ii] / 2)) | |
| idx = np.concatenate((idx_l, idx_r)) | |
| idx = idx.astype("int32") | |
| subwin_range = posit[ii] + np.arange(-np.floor(scale_lens[ii] / 2), | |
| np.ceil(scale_lens[ii] / 2)) | |
| win_range = subwin_range % (signal_len + fill) | |
| win_range = win_range.astype("int32") | |
| if window_lens[ii] < scale_lens[ii]: | |
| raise ValueError("Not handling 'not enough channels' case") | |
| else: | |
| temp = np.zeros((window_lens[ii],)).astype(X_fft.dtype) | |
| temp_idx_l = np.arange(len(temp) - np.floor(scale_lens[ii] / 2), | |
| len(temp)) | |
| temp_idx_r = np.arange(np.ceil(scale_lens[ii] / 2)) | |
| temp_idx = np.concatenate((temp_idx_l, temp_idx_r)) | |
| temp_idx = temp_idx.astype("int32") | |
| temp[temp_idx] = X_fft[win_range] * multiscale[ii][idx] | |
| fs_new_bins = window_lens[ii] | |
| fk_bins = posit[ii] | |
| displace = fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins | |
| displace = displace.astype("int32") | |
| temp = np.roll(temp, displace) | |
| c.append(np.fft.ifft(temp)) | |
| if 0: | |
| # cell2mat concatenation | |
| c = np.concatenate(c) | |
| return c | |
| def nsdual(multiscale, shift, window_lens): | |
| """ | |
| Calculation of nonstationary inverse gabor filters | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| N = len(shift) | |
| posit = np.cumsum(shift) | |
| seq_len = posit[-1] | |
| posit = posit - shift[0] | |
| diagonal = np.zeros((seq_len,)) | |
| win_range = [] | |
| for ii in range(N): | |
| filt_len = len(multiscale[ii]) | |
| idx = np.arange(-np.floor(filt_len / 2), np.ceil(filt_len / 2)) | |
| win_range.append((posit[ii] + idx) % seq_len) | |
| subdiag = window_lens[ii] * np.fft.fftshift(multiscale[ii]) ** 2 | |
| ind = win_range[ii].astype(np.int) | |
| diagonal[ind] = diagonal[ind] + subdiag | |
| dual_multiscale = multiscale | |
| for ii in range(N): | |
| ind = win_range[ii].astype(np.int) | |
| dual_multiscale[ii] = np.fft.ifftshift( | |
| np.fft.fftshift(dual_multiscale[ii]) / diagonal[ind]) | |
| return dual_multiscale | |
| def nsgitf_real(c, c_dc, c_nyq, multiscale, shift): | |
| """ | |
| Nonstationary Inverse Gabor Transform on real valued signal | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| c_l = [] | |
| c_l.append(c_dc) | |
| c_l.extend([ci for ci in c]) | |
| c_l.append(c_nyq) | |
| posit = np.cumsum(shift) | |
| seq_len = posit[-1] | |
| posit -= shift[0] | |
| out = np.zeros((seq_len,)).astype(c_l[1].dtype) | |
| for ii in range(len(c_l)): | |
| filt_len = len(multiscale[ii]) | |
| win_range = posit[ii] + np.arange(-np.floor(filt_len / 2), | |
| np.ceil(filt_len / 2)) | |
| win_range = (win_range % seq_len).astype(np.int) | |
| temp = np.fft.fft(c_l[ii]) * len(c_l[ii]) | |
| fs_new_bins = len(c_l[ii]) | |
| fk_bins = posit[ii] | |
| displace = int(fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins) | |
| temp = np.roll(temp, -displace) | |
| l = np.arange(len(temp) - np.floor(filt_len / 2), len(temp)) | |
| r = np.arange(np.ceil(filt_len / 2)) | |
| temp_idx = (np.concatenate((l, r)) % len(temp)).astype(np.int) | |
| temp = temp[temp_idx] | |
| lf = np.arange(filt_len - np.floor(filt_len / 2), filt_len) | |
| rf = np.arange(np.ceil(filt_len / 2)) | |
| filt_idx = np.concatenate((lf, rf)).astype(np.int) | |
| m = multiscale[ii][filt_idx] | |
| out[win_range] = out[win_range] + m * temp | |
| nyq_bin = np.floor(seq_len / 2) + 1 | |
| out_idx = np.arange( | |
| nyq_bin - np.abs(1 - seq_len % 2) - 1, 0, -1).astype(np.int) | |
| out[nyq_bin:] = np.conj(out[out_idx]) | |
| t_out = np.real(np.fft.ifft(out)).astype(np.float64) | |
| return t_out | |
| def cqt(X, fs, n_bins=48, fmin=27.5, fmax="nyq", gamma=20): | |
| """ | |
| Constant Q Transform | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| if fmax == "nyq": | |
| fmax = fs / 2. | |
| multiscale, shift, window_lens = nsgcwin(fmin, fmax, n_bins, fs, | |
| len(X), gamma) | |
| fbas = fs * np.cumsum(shift[1:]) / len(X) | |
| fbas = fbas[:len(window_lens) // 2 - 1] | |
| bins = window_lens.shape[0] // 2 - 1 | |
| window_lens[1:bins + 1] = window_lens[bins + 2] | |
| window_lens[bins + 2:] = window_lens[1:bins + 1][::-1] | |
| norm = 2. * window_lens[:bins + 2] / float(len(X)) | |
| norm = np.concatenate((norm, norm[1:-1][::-1])) | |
| multiscale = [norm[ii] * multiscale[ii] for ii in range(2 * (bins + 1))] | |
| c = nsgtf_real(X, multiscale, shift, window_lens) | |
| c_dc = c[0] | |
| c_nyq = c[-1] | |
| c_sub = c[1:-1] | |
| c = np.vstack(c_sub) | |
| return c, c_dc, c_nyq, multiscale, shift, window_lens | |
| def icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens): | |
| """ | |
| Inverse constant Q Transform | |
| References | |
| ---------- | |
| Velasco G. A., Holighaus N., Dörfler M., Grill T. | |
| Constructing an invertible constant-Q transform with nonstationary Gabor | |
| frames, Proceedings of the 14th International Conference on Digital | |
| Audio Effects (DAFx 11), Paris, France, 2011 | |
| Holighaus N., Dörfler M., Velasco G. A. and Grill T. | |
| A framework for invertible, real-time constant-Q transforms, submitted. | |
| Original matlab code copyright follows: | |
| AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011 | |
| COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA | |
| http://nuhag.eu/ | |
| Permission is granted to modify and re-distribute this | |
| code in any manner as long as this notice is preserved. | |
| All standard disclaimers apply. | |
| """ | |
| new_multiscale = nsdual(multiscale, shift, window_lens) | |
| X = nsgitf_real(X_cq, c_dc, c_nyq, new_multiscale, shift) | |
| return X | |
| def rolling_mean(X, window_size): | |
| w = 1.0 / window_size * np.ones((window_size)) | |
| return np.correlate(X, w, 'valid') | |
| def voiced_unvoiced(X, window_size=256, window_step=128, copy=True): | |
| """ | |
| Voiced unvoiced detection from a raw signal | |
| Based on code from: | |
| https://www.clear.rice.edu/elec532/PROJECTS96/lpc/code.html | |
| Other references: | |
| http://www.seas.ucla.edu/spapl/code/harmfreq_MOLRT_VAD.m | |
| Parameters | |
| ---------- | |
| X : ndarray | |
| Raw input signal | |
| window_size : int, optional (default=256) | |
| The window size to use, in samples. | |
| window_step : int, optional (default=128) | |
| How far the window steps after each calculation, in samples. | |
| copy : bool, optional (default=True) | |
| Whether to make a copy of the input array or allow in place changes. | |
| """ | |
| X = np.array(X, copy=copy) | |
| if len(X.shape) < 2: | |
| X = X[None] | |
| n_points = X.shape[1] | |
| n_windows = n_points // window_step | |
| # Padding | |
| pad_sizes = [(window_size - window_step) // 2, | |
| window_size - window_step // 2] | |
| # TODO: Handling for odd window sizes / steps | |
| X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X, | |
| np.zeros((X.shape[0], pad_sizes[1])))) | |
| clipping_factor = 0.6 | |
| b, a = sg.butter(10, np.pi * 9 / 40) | |
| voiced_unvoiced = np.zeros((n_windows, 1)) | |
| period = np.zeros((n_windows, 1)) | |
| for window in range(max(n_windows - 1, 1)): | |
| XX = X.ravel()[window * window_step + np.arange(window_size)] | |
| XX *= sg.hamming(len(XX)) | |
| XX = sg.lfilter(b, a, XX) | |
| left_max = np.max(np.abs(XX[:len(XX) // 3])) | |
| right_max = np.max(np.abs(XX[-len(XX) // 3:])) | |
| clip_value = clipping_factor * np.min([left_max, right_max]) | |
| XX_clip = np.clip(XX, clip_value, -clip_value) | |
| XX_corr = np.correlate(XX_clip, XX_clip, mode='full') | |
| center = np.argmax(XX_corr) | |
| right_XX_corr = XX_corr[center:] | |
| prev_window = max([window - 1, 0]) | |
| if voiced_unvoiced[prev_window] > 0: | |
| # Want it to be harder to turn off than turn on | |
| strength_factor = .29 | |
| else: | |
| strength_factor = .3 | |
| start = np.where(right_XX_corr < .3 * XX_corr[center])[0] | |
| # 20 is hardcoded but should depend on samplerate? | |
| start = np.max([20, start[0]]) | |
| search_corr = right_XX_corr[start:] | |
| index = np.argmax(search_corr) | |
| second_max = search_corr[index] | |
| if (second_max > strength_factor * XX_corr[center]): | |
| voiced_unvoiced[window] = 1 | |
| period[window] = start + index - 1 | |
| else: | |
| voiced_unvoiced[window] = 0 | |
| period[window] = 0 | |
| return np.array(voiced_unvoiced), np.array(period) | |
| def lpc_analysis(X, order=8, window_step=128, window_size=2 * 128, | |
| emphasis=0.9, voiced_start_threshold=.9, | |
| voiced_stop_threshold=.6, truncate=False, copy=True): | |
| """ | |
| Extract LPC coefficients from a signal | |
| Based on code from: | |
| http://labrosa.ee.columbia.edu/matlab/sws/ | |
| Parameters | |
| ---------- | |
| X : ndarray | |
| Signals to extract LPC coefficients from | |
| order : int, optional (default=8) | |
| Order of the LPC coefficients. For speech, use the general rule that the | |
| order is two times the expected number of formants plus 2. | |
| This can be formulated as 2 + 2 * (fs // 2000). For approx. signals | |
| with fs = 7000, this is 8 coefficients - 2 + 2 * (7000 // 2000). | |
| window_step : int, optional (default=128) | |
| The size (in samples) of the space between each window | |
| window_size : int, optional (default=2 * 128) | |
| The size of each window (in samples) to extract coefficients over | |
| emphasis : float, optional (default=0.9) | |
| The emphasis coefficient to use for filtering | |
| voiced_start_threshold : float, optional (default=0.9) | |
| Upper power threshold for estimating when speech has started | |
| voiced_stop_threshold : float, optional (default=0.6) | |
| Lower power threshold for estimating when speech has stopped | |
| truncate : bool, optional (default=False) | |
| Whether to cut the data at the last window or do zero padding. | |
| copy : bool, optional (default=True) | |
| Whether to copy the input X or modify in place | |
| Returns | |
| ------- | |
| lp_coefficients : ndarray | |
| lp coefficients to describe the frame | |
| per_frame_gain : ndarray | |
| calculated gain for each frame | |
| residual_excitation : ndarray | |
| leftover energy which is not described by lp coefficents and gain | |
| voiced_frames : ndarray | |
| array of [0, 1] values which holds voiced/unvoiced decision for each | |
| frame. | |
| References | |
| ---------- | |
| D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", | |
| Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ | |
| """ | |
| X = np.array(X, copy=copy) | |
| if len(X.shape) < 2: | |
| X = X[None] | |
| n_points = X.shape[1] | |
| n_windows = n_points // window_step | |
| if not truncate: | |
| pad_sizes = [(window_size - window_step) // 2, | |
| window_size - window_step // 2] | |
| # TODO: Handling for odd window sizes / steps | |
| X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X, | |
| np.zeros((X.shape[0], pad_sizes[1])))) | |
| else: | |
| pad_sizes = [0, 0] | |
| X = X[0, :n_windows * window_step] | |
| lp_coefficients = np.zeros((n_windows, order + 1)) | |
| per_frame_gain = np.zeros((n_windows, 1)) | |
| residual_excitation = np.zeros( | |
| ((n_windows - 1) * window_step + window_size)) | |
| # Pre-emphasis high-pass filter | |
| X = sg.lfilter([1, -emphasis], 1, X) | |
| # stride_tricks.as_strided? | |
| autocorr_X = np.zeros((n_windows, 2 * window_size - 1)) | |
| for window in range(max(n_windows - 1, 1)): | |
| XX = X.ravel()[window * window_step + np.arange(window_size)] | |
| WXX = XX * sg.hanning(window_size) | |
| autocorr_X[window] = np.correlate(WXX, WXX, mode='full') | |
| center = np.argmax(autocorr_X[window]) | |
| RXX = autocorr_X[window, | |
| np.arange(center, window_size + order)] | |
| R = linalg.toeplitz(RXX[:-1]) | |
| solved_R = linalg.pinv(R).dot(RXX[1:]) | |
| filter_coefs = np.hstack((1, -solved_R)) | |
| residual_signal = sg.lfilter(filter_coefs, 1, WXX) | |
| gain = np.sqrt(np.mean(residual_signal ** 2)) | |
| lp_coefficients[window] = filter_coefs | |
| per_frame_gain[window] = gain | |
| assign_range = window * window_step + np.arange(window_size) | |
| residual_excitation[assign_range] += residual_signal / gain | |
| # Throw away first part in overlap mode for proper synthesis | |
| residual_excitation = residual_excitation[pad_sizes[0]:] | |
| return lp_coefficients, per_frame_gain, residual_excitation | |
| def lpc_synthesis(lp_coefficients, per_frame_gain, residual_excitation=None, | |
| voiced_frames=None, window_step=128, emphasis=0.9): | |
| """ | |
| Synthesize a signal from LPC coefficients | |
| Based on code from: | |
| http://labrosa.ee.columbia.edu/matlab/sws/ | |
| http://web.uvic.ca/~tyoon/resource/auditorytoolbox/auditorytoolbox/synlpc.html | |
| Parameters | |
| ---------- | |
| lp_coefficients : ndarray | |
| Linear prediction coefficients | |
| per_frame_gain : ndarray | |
| Gain coefficients | |
| residual_excitation : ndarray or None, optional (default=None) | |
| Residual excitations. If None, this will be synthesized with white noise | |
| voiced_frames : ndarray or None, optional (default=None) | |
| Voiced frames. If None, all frames assumed to be voiced. | |
| window_step : int, optional (default=128) | |
| The size (in samples) of the space between each window | |
| emphasis : float, optional (default=0.9) | |
| The emphasis coefficient to use for filtering | |
| overlap_add : bool, optional (default=True) | |
| What type of processing to use when joining windows | |
| copy : bool, optional (default=True) | |
| Whether to copy the input X or modify in place | |
| Returns | |
| ------- | |
| synthesized : ndarray | |
| Sound vector synthesized from input arguments | |
| References | |
| ---------- | |
| D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", | |
| Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ | |
| """ | |
| # TODO: Incorporate better synthesis from | |
| # http://eecs.oregonstate.edu/education/docs/ece352/CompleteManual.pdf | |
| window_size = 2 * window_step | |
| [n_windows, order] = lp_coefficients.shape | |
| n_points = (n_windows + 1) * window_step | |
| n_excitation_points = n_points + window_step + window_step // 2 | |
| random_state = np.random.RandomState(1999) | |
| if residual_excitation is None: | |
| # Need to generate excitation | |
| if voiced_frames is None: | |
| # No voiced/unvoiced info, so just use randn | |
| voiced_frames = np.ones((lp_coefficients.shape[0], 1)) | |
| residual_excitation = np.zeros((n_excitation_points)) | |
| f, m = lpc_to_frequency(lp_coefficients, per_frame_gain) | |
| t = np.linspace(0, 1, window_size, endpoint=False) | |
| hanning = sg.hanning(window_size) | |
| for window in range(n_windows): | |
| window_base = window * window_step | |
| index = window_base + np.arange(window_size) | |
| if voiced_frames[window]: | |
| sig = np.zeros_like(t) | |
| cycles = np.cumsum(f[window][0] * t) | |
| sig += sg.sawtooth(cycles, 0.001) | |
| residual_excitation[index] += hanning * sig | |
| residual_excitation[index] += hanning * 0.01 * random_state.randn( | |
| window_size) | |
| else: | |
| n_excitation_points = residual_excitation.shape[0] | |
| n_points = n_excitation_points + window_step + window_step // 2 | |
| residual_excitation = np.hstack((residual_excitation, | |
| np.zeros(window_size))) | |
| if voiced_frames is None: | |
| voiced_frames = np.ones_like(per_frame_gain) | |
| synthesized = np.zeros((n_points)) | |
| for window in range(n_windows): | |
| window_base = window * window_step | |
| oldbit = synthesized[window_base + np.arange(window_step)] | |
| w_coefs = lp_coefficients[window] | |
| if not np.all(w_coefs): | |
| # Hack to make lfilter avoid | |
| # ValueError: BUG: filter coefficient a[0] == 0 not supported yet | |
| # when all coeffs are 0 | |
| w_coefs = [1] | |
| g_coefs = voiced_frames[window] * per_frame_gain[window] | |
| index = window_base + np.arange(window_size) | |
| newbit = g_coefs * sg.lfilter([1], w_coefs, | |
| residual_excitation[index]) | |
| synthesized[index] = np.hstack((oldbit, np.zeros( | |
| (window_size - window_step)))) | |
| synthesized[index] += sg.hanning(window_size) * newbit | |
| synthesized = sg.lfilter([1], [1, -emphasis], synthesized) | |
| return synthesized | |
| def soundsc(X, copy=True): | |
| """ | |
| Approximate implementation of soundsc from MATLAB without the audio playing. | |
| Parameters | |
| ---------- | |
| X : ndarray | |
| Signal to be rescaled | |
| copy : bool, optional (default=True) | |
| Whether to make a copy of input signal or operate in place. | |
| Returns | |
| ------- | |
| X_sc : ndarray | |
| (-1, 1) scaled version of X as float32, suitable for writing | |
| with scipy.io.wavfile | |
| """ | |
| X = np.array(X, copy=copy) | |
| X = (X - X.min()) / (X.max() - X.min()) | |
| X = .9 * X | |
| X = 2 * X - 1 | |
| return X.astype('float32') | |
| def lpc_to_frequency(lp_coefficients, per_frame_gain): | |
| """ | |
| Extract resonant frequencies and magnitudes from LPC coefficients and gains. | |
| Parameters | |
| ---------- | |
| lp_coefficients : ndarray | |
| LPC coefficients, such as those calculated by ``lpc_analysis`` | |
| per_frame_gain : ndarray | |
| Gain calculated for each frame, such as those calculated | |
| by ``lpc_analysis`` | |
| Returns | |
| ------- | |
| frequencies : ndarray | |
| Resonant frequencies calculated from LPC coefficients and gain. Returned | |
| frequencies are from 0 to 2 * pi | |
| magnitudes : ndarray | |
| Magnitudes of resonant frequencies | |
| References | |
| ---------- | |
| D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", | |
| Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ | |
| """ | |
| n_windows, order = lp_coefficients.shape | |
| frame_frequencies = np.zeros((n_windows, (order - 1) // 2)) | |
| frame_magnitudes = np.zeros_like(frame_frequencies) | |
| for window in range(n_windows): | |
| w_coefs = lp_coefficients[window] | |
| g_coefs = per_frame_gain[window] | |
| roots = np.roots(np.hstack(([1], w_coefs[1:]))) | |
| # Roots doesn't return the same thing as MATLAB... agh | |
| frequencies, index = np.unique( | |
| np.abs(np.angle(roots)), return_index=True) | |
| # Make sure 0 doesn't show up... | |
| gtz = np.where(frequencies > 0)[0] | |
| frequencies = frequencies[gtz] | |
| index = index[gtz] | |
| magnitudes = g_coefs / (1. - np.abs(roots)) | |
| sort_index = np.argsort(frequencies) | |
| frame_frequencies[window, :len(sort_index)] = frequencies[sort_index] | |
| frame_magnitudes[window, :len(sort_index)] = magnitudes[sort_index] | |
| return frame_frequencies, frame_magnitudes | |
| def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True): | |
| """ | |
| Contruct a sinusoidal model for the input signal. | |
| Parameters | |
| ---------- | |
| X : ndarray | |
| Input signal to model | |
| input_sample_rate : int | |
| The sample rate of the input signal | |
| resample_block : int, optional (default=128) | |
| Controls the step size of the sinusoidal model | |
| Returns | |
| ------- | |
| frequencies_hz : ndarray | |
| Frequencies for the sinusoids, in Hz. | |
| magnitudes : ndarray | |
| Magnitudes of sinusoids returned in ``frequencies`` | |
| References | |
| ---------- | |
| D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", | |
| Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ | |
| """ | |
| X = np.array(X, copy=copy) | |
| resample_to = 8000 | |
| if input_sample_rate != resample_to: | |
| if input_sample_rate % resample_to != 0: | |
| raise ValueError("Input sample rate must be a multiple of 8k!") | |
| # Should be able to use resample... ? | |
| # resampled_count = round(len(X) * resample_to / input_sample_rate) | |
| # X = sg.resample(X, resampled_count, window=sg.hanning(len(X))) | |
| X = sg.decimate(X, input_sample_rate // resample_to) | |
| step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.) | |
| a, g, e = lpc_analysis(X, order=8, window_step=step_size, | |
| window_size=2 * step_size) | |
| f, m = lpc_to_frequency(a, g) | |
| f_hz = f * resample_to / (2 * np.pi) | |
| return f_hz, m | |
| def slinterp(X, factor, copy=True): | |
| """ | |
| Slow-ish linear interpolation of a 1D numpy array. There must be some | |
| better function to do this in numpy. | |
| Parameters | |
| ---------- | |
| X : ndarray | |
| 1D input array to interpolate | |
| factor : int | |
| Integer factor to interpolate by | |
| Return | |
| ------ | |
| X_r : ndarray | |
| """ | |
| sz = np.product(X.shape) | |
| X = np.array(X, copy=copy) | |
| X_s = np.hstack((X[1:], [0])) | |
| X_r = np.zeros((factor, sz)) | |
| for i in range(factor): | |
| X_r[i, :] = (factor - i) / float(factor) * X + (i / float(factor)) * X_s | |
| return X_r.T.ravel()[:(sz - 1) * factor + 1] | |
| def sinusoid_synthesis(frequencies_hz, magnitudes, input_sample_rate=16000, | |
| resample_block=128): | |
| """ | |
| Create a time series based on input frequencies and magnitudes. | |
| Parameters | |
| ---------- | |
| frequencies_hz : ndarray | |
| Input signal to model | |
| magnitudes : int | |
| The sample rate of the input signal | |
| input_sample_rate : int, optional (default=16000) | |
| The sample rate parameter that the sinusoid analysis was run with | |
| resample_block : int, optional (default=128) | |
| Controls the step size of the sinusoidal model | |
| Returns | |
| ------- | |
| synthesized : ndarray | |
| Sound vector synthesized from input arguments | |
| References | |
| ---------- | |
| D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", | |
| Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ | |
| """ | |
| rows, cols = frequencies_hz.shape | |
| synthesized = np.zeros((1 + ((rows - 1) * resample_block),)) | |
| for col in range(cols): | |
| mags = slinterp(magnitudes[:, col], resample_block) | |
| freqs = slinterp(frequencies_hz[:, col], resample_block) | |
| cycles = np.cumsum(2 * np.pi * freqs / float(input_sample_rate)) | |
| sines = mags * np.cos(cycles) | |
| synthesized += sines | |
| return synthesized | |
| def compress(X, n_components, window_size=128): | |
| """ | |
| Compress using the DCT | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples,) | |
| The input signal to compress. Should be 1-dimensional | |
| n_components : int | |
| The number of DCT components to keep. Setting n_components to about | |
| .5 * window_size can give compression with fairly good reconstruction. | |
| window_size : int | |
| The input X is broken into windows of window_size, each of which are | |
| then compressed with the DCT. | |
| Returns | |
| ------- | |
| X_compressed : ndarray, shape=(num_windows, window_size) | |
| A 2D array of non-overlapping DCT coefficients. For use with uncompress | |
| Reference | |
| --------- | |
| http://nbviewer.ipython.org/github/craffel/crucialpython/blob/master/week3/stride_tricks.ipynb | |
| """ | |
| if len(X) % window_size != 0: | |
| append = np.zeros((window_size - len(X) % window_size)) | |
| X = np.hstack((X, append)) | |
| num_frames = len(X) // window_size | |
| X_strided = X.reshape((num_frames, window_size)) | |
| X_dct = fftpack.dct(X_strided, norm='ortho') | |
| if n_components is not None: | |
| X_dct = X_dct[:, :n_components] | |
| return X_dct | |
| def uncompress(X_compressed, window_size=128): | |
| """ | |
| Uncompress a DCT compressed signal (such as returned by ``compress``). | |
| Parameters | |
| ---------- | |
| X_compressed : ndarray, shape=(n_samples, n_features) | |
| Windowed and compressed array. | |
| window_size : int, optional (default=128) | |
| Size of the window used when ``compress`` was called. | |
| Returns | |
| ------- | |
| X_reconstructed : ndarray, shape=(n_samples) | |
| Reconstructed version of X. | |
| """ | |
| if X_compressed.shape[1] % window_size != 0: | |
| append = np.zeros((X_compressed.shape[0], | |
| window_size - X_compressed.shape[1] % window_size)) | |
| X_compressed = np.hstack((X_compressed, append)) | |
| X_r = fftpack.idct(X_compressed, norm='ortho') | |
| return X_r.ravel() | |
| def sine_window(X): | |
| """ | |
| Apply a sinusoid window to X. | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples, n_features) | |
| Input array of samples | |
| Returns | |
| ------- | |
| X_windowed : ndarray, shape=(n_samples, n_features) | |
| Windowed version of X. | |
| """ | |
| i = np.arange(X.shape[1]) | |
| win = np.sin(np.pi * (i + 0.5) / X.shape[1]) | |
| row_stride = 0 | |
| col_stride = win.itemsize | |
| strided_win = as_strided(win, shape=X.shape, | |
| strides=(row_stride, col_stride)) | |
| return X * strided_win | |
| def kaiserbessel_window(X, alpha=6.5): | |
| """ | |
| Apply a Kaiser-Bessel window to X. | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples, n_features) | |
| Input array of samples | |
| alpha : float, optional (default=6.5) | |
| Tuning parameter for Kaiser-Bessel function. alpha=6.5 should make | |
| perfect reconstruction possible for DCT. | |
| Returns | |
| ------- | |
| X_windowed : ndarray, shape=(n_samples, n_features) | |
| Windowed version of X. | |
| """ | |
| beta = np.pi * alpha | |
| win = sg.kaiser(X.shape[1], beta) | |
| row_stride = 0 | |
| col_stride = win.itemsize | |
| strided_win = as_strided(win, shape=X.shape, | |
| strides=(row_stride, col_stride)) | |
| return X * strided_win | |
| def overlap(X, window_size, window_step): | |
| """ | |
| Create an overlapped version of X | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples,) | |
| Input signal to window and overlap | |
| window_size : int | |
| Size of windows to take | |
| window_step : int | |
| Step size between windows | |
| Returns | |
| ------- | |
| X_strided : shape=(n_windows, window_size) | |
| 2D array of overlapped X | |
| """ | |
| if window_size % 2 != 0: | |
| raise ValueError("Window size must be even!") | |
| # Make sure there are an even number of windows before stridetricks | |
| append = np.zeros((window_size - len(X) % window_size)) | |
| X = np.hstack((X, append)) | |
| num_frames = len(X) // window_step - 1 | |
| row_stride = X.itemsize * window_step | |
| col_stride = X.itemsize | |
| X_strided = as_strided(X, shape=(num_frames, window_size), | |
| strides=(row_stride, col_stride)) | |
| return X_strided | |
| def halfoverlap(X, window_size): | |
| """ | |
| Create an overlapped version of X using 50% of window_size as overlap. | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples,) | |
| Input signal to window and overlap | |
| window_size : int | |
| Size of windows to take | |
| Returns | |
| ------- | |
| X_strided : shape=(n_windows, window_size) | |
| 2D array of overlapped X | |
| """ | |
| if window_size % 2 != 0: | |
| raise ValueError("Window size must be even!") | |
| window_step = window_size // 2 | |
| # Make sure there are an even number of windows before stridetricks | |
| append = np.zeros((window_size - len(X) % window_size)) | |
| X = np.hstack((X, append)) | |
| num_frames = len(X) // window_step - 1 | |
| row_stride = X.itemsize * window_step | |
| col_stride = X.itemsize | |
| X_strided = as_strided(X, shape=(num_frames, window_size), | |
| strides=(row_stride, col_stride)) | |
| return X_strided | |
| def invert_halfoverlap(X_strided): | |
| """ | |
| Invert ``halfoverlap`` function to reconstruct X | |
| Parameters | |
| ---------- | |
| X_strided : ndarray, shape=(n_windows, window_size) | |
| X as overlapped windows | |
| Returns | |
| ------- | |
| X : ndarray, shape=(n_samples,) | |
| Reconstructed version of X | |
| """ | |
| # Hardcoded 50% overlap! Can generalize later... | |
| n_rows, n_cols = X_strided.shape | |
| X = np.zeros((((int(n_rows // 2) + 1) * n_cols),)).astype(X_strided.dtype) | |
| start_index = 0 | |
| end_index = n_cols | |
| window_step = n_cols // 2 | |
| for row in range(X_strided.shape[0]): | |
| X[start_index:end_index] += X_strided[row] | |
| start_index += window_step | |
| end_index += window_step | |
| return X | |
| def csvd(arr): | |
| """ | |
| Do the complex SVD of a 2D array, returning real valued U, S, VT | |
| http://stemblab.github.io/complex-svd/ | |
| """ | |
| C_r = arr.real | |
| C_i = arr.imag | |
| block_x = C_r.shape[0] | |
| block_y = C_r.shape[1] | |
| K = np.zeros((2 * block_x, 2 * block_y)) | |
| # Upper left | |
| K[:block_x, :block_y] = C_r | |
| # Lower left | |
| K[:block_x, block_y:] = C_i | |
| # Upper right | |
| K[block_x:, :block_y] = -C_i | |
| # Lower right | |
| K[block_x:, block_y:] = C_r | |
| return svd(K, full_matrices=False) | |
| def icsvd(U, S, VT): | |
| """ | |
| Invert back to complex values from the output of csvd | |
| U, S, VT = csvd(X) | |
| X_rec = inv_csvd(U, S, VT) | |
| """ | |
| K = U.dot(np.diag(S)).dot(VT) | |
| block_x = U.shape[0] // 2 | |
| block_y = U.shape[1] // 2 | |
| arr_rec = np.zeros((block_x, block_y)) + 0j | |
| arr_rec.real = K[:block_x, :block_y] | |
| arr_rec.imag = K[:block_x, block_y:] | |
| return arr_rec | |
| def overlap_compress(X, n_components, window_size): | |
| """ | |
| Overlap (at 50% of window_size) and compress X. | |
| Parameters | |
| ---------- | |
| X : ndarray, shape=(n_samples,) | |
| Input signal to compress | |
| n_components : int | |
| number of DCT components to keep | |
| window_size : int | |
| Size of windows to take | |
| Returns | |
| ------- | |
| X_dct : ndarray, shape=(n_windows, n_components) | |
| Windowed and compressed version of X | |
| """ | |
| X_strided = halfoverlap(X, window_size) | |
| X_dct = fftpack.dct(X_strided, norm='ortho') | |
| if n_components is not None: | |
| X_dct = X_dct[:, :n_components] | |
| return X_dct | |
| # Evil voice is caused by adding double the zeros before inverse DCT... | |
| # Very cool bug but makes sense | |
| def overlap_uncompress(X_compressed, window_size): | |
| """ | |
| Uncompress X as returned from ``overlap_compress``. | |
| Parameters | |
| ---------- | |
| X_compressed : ndarray, shape=(n_windows, n_components) | |
| Windowed and compressed version of X | |
| window_size : int | |
| Size of windows originally used when compressing X | |
| Returns | |
| ------- | |
| X_reconstructed : ndarray, shape=(n_samples,) | |
| Reconstructed version of X | |
| """ | |
| if X_compressed.shape[1] % window_size != 0: | |
| append = np.zeros((X_compressed.shape[0], window_size - | |
| X_compressed.shape[1] % window_size)) | |
| X_compressed = np.hstack((X_compressed, append)) | |
| X_r = fftpack.idct(X_compressed, norm='ortho') | |
| return invert_halfoverlap(X_r) | |
| def lpc_to_lsf(all_lpc): | |
| if len(all_lpc.shape) < 2: | |
| all_lpc = all_lpc[None] | |
| order = all_lpc.shape[1] - 1 | |
| all_lsf = np.zeros((len(all_lpc), order)) | |
| for i in range(len(all_lpc)): | |
| lpc = all_lpc[i] | |
| lpc1 = np.append(lpc, 0) | |
| lpc2 = lpc1[::-1] | |
| sum_filt = lpc1 + lpc2 | |
| diff_filt = lpc1 - lpc2 | |
| if order % 2 != 0: | |
| deconv_diff, _ = sg.deconvolve(diff_filt, [1, 0, -1]) | |
| deconv_sum = sum_filt | |
| else: | |
| deconv_diff, _ = sg.deconvolve(diff_filt, [1, -1]) | |
| deconv_sum, _ = sg.deconvolve(sum_filt, [1, 1]) | |
| roots_diff = np.roots(deconv_diff) | |
| roots_sum = np.roots(deconv_sum) | |
| angle_diff = np.angle(roots_diff[::2]) | |
| angle_sum = np.angle(roots_sum[::2]) | |
| lsf = np.sort(np.hstack((angle_diff, angle_sum))) | |
| if len(lsf) != 0: | |
| all_lsf[i] = lsf | |
| return np.squeeze(all_lsf) | |
| def lsf_to_lpc(all_lsf): | |
| if len(all_lsf.shape) < 2: | |
| all_lsf = all_lsf[None] | |
| order = all_lsf.shape[1] | |
| all_lpc = np.zeros((len(all_lsf), order + 1)) | |
| for i in range(len(all_lsf)): | |
| lsf = all_lsf[i] | |
| zeros = np.exp(1j * lsf) | |
| sum_zeros = zeros[::2] | |
| diff_zeros = zeros[1::2] | |
| sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros))) | |
| diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros))) | |
| sum_filt = np.poly(sum_zeros) | |
| diff_filt = np.poly(diff_zeros) | |
| if order % 2 != 0: | |
| deconv_diff = sg.convolve(diff_filt, [1, 0, -1]) | |
| deconv_sum = sum_filt | |
| else: | |
| deconv_diff = sg.convolve(diff_filt, [1, -1]) | |
| deconv_sum = sg.convolve(sum_filt, [1, 1]) | |
| lpc = .5 * (deconv_sum + deconv_diff) | |
| # Last coefficient is 0 and not returned | |
| all_lpc[i] = lpc[:-1] | |
| return np.squeeze(all_lpc) | |
| def herz_to_mel(freqs): | |
| """ | |
| Based on code by Dan Ellis | |
| http://labrosa.ee.columbia.edu/matlab/tf_agc/ | |
| """ | |
| f_0 = 0 # 133.33333 | |
| f_sp = 200 / 3. # 66.66667 | |
| bark_freq = 1000. | |
| bark_pt = (bark_freq - f_0) / f_sp | |
| # The magic 1.0711703 which is the ratio needed to get from 1000 Hz | |
| # to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz | |
| # and the preceding linear filter center at 933.33333 Hz | |
| # (actually 1000/933.33333 = 1.07142857142857 and | |
| # exp(log(6.4)/27) = 1.07117028749447) | |
| if not isinstance(freqs, np.ndarray): | |
| freqs = np.array(freqs)[None] | |
| log_step = np.exp(np.log(6.4) / 27) | |
| lin_pts = (freqs < bark_freq) | |
| mel = 0. * freqs | |
| mel[lin_pts] = (freqs[lin_pts] - f_0) / f_sp | |
| mel[~lin_pts] = bark_pt + np.log(freqs[~lin_pts] / bark_freq) / np.log( | |
| log_step) | |
| return mel | |
| def mel_to_herz(mel): | |
| """ | |
| Based on code by Dan Ellis | |
| http://labrosa.ee.columbia.edu/matlab/tf_agc/ | |
| """ | |
| f_0 = 0 # 133.33333 | |
| f_sp = 200 / 3. # 66.66667 | |
| bark_freq = 1000. | |
| bark_pt = (bark_freq - f_0) / f_sp | |
| # The magic 1.0711703 which is the ratio needed to get from 1000 Hz | |
| # to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz | |
| # and the preceding linear filter center at 933.33333 Hz | |
| # (actually 1000/933.33333 = 1.07142857142857 and | |
| # exp(log(6.4)/27) = 1.07117028749447) | |
| if not isinstance(mel, np.ndarray): | |
| mel = np.array(mel)[None] | |
| log_step = np.exp(np.log(6.4) / 27) | |
| lin_pts = (mel < bark_pt) | |
| freqs = 0. * mel | |
| freqs[lin_pts] = f_0 + f_sp * mel[lin_pts] | |
| freqs[~lin_pts] = bark_freq * np.exp(np.log(log_step) * ( | |
| mel[~lin_pts] - bark_pt)) | |
| return freqs | |
| def mel_freq_weights(n_fft, fs, n_filts=None, width=None): | |
| """ | |
| Based on code by Dan Ellis | |
| http://labrosa.ee.columbia.edu/matlab/tf_agc/ | |
| """ | |
| min_freq = 0 | |
| max_freq = fs // 2 | |
| if width is None: | |
| width = 1. | |
| if n_filts is None: | |
| n_filts = int(herz_to_mel(max_freq) / 2) + 1 | |
| else: | |
| n_filts = int(n_filts) | |
| assert n_filts > 0 | |
| weights = np.zeros((n_filts, n_fft)) | |
| fft_freqs = np.arange(n_fft // 2) / n_fft * fs | |
| min_mel = herz_to_mel(min_freq) | |
| max_mel = herz_to_mel(max_freq) | |
| partial = np.arange(n_filts + 2) / (n_filts + 1) * (max_mel - min_mel) | |
| bin_freqs = mel_to_herz(min_mel + partial) | |
| bin_bin = np.round(bin_freqs / fs * (n_fft - 1)) | |
| for i in range(n_filts): | |
| fs_i = bin_freqs[i + np.arange(3)] | |
| fs_i = fs_i[1] + width * (fs_i - fs_i[1]) | |
| lo_slope = (fft_freqs - fs_i[0]) / float(fs_i[1] - fs_i[0]) | |
| hi_slope = (fs_i[2] - fft_freqs) / float(fs_i[2] - fs_i[1]) | |
| weights[i, :n_fft // 2] = np.maximum( | |
| 0, np.minimum(lo_slope, hi_slope)) | |
| # Constant amplitude multiplier | |
| weights = np.diag(2. / (bin_freqs[2:n_filts + 2] | |
| - bin_freqs[:n_filts])).dot(weights) | |
| weights[:, n_fft // 2:] = 0 | |
| return weights | |
| def time_attack_agc(X, fs, t_scale=0.5, f_scale=1.): | |
| """ | |
| AGC based on code by Dan Ellis | |
| http://labrosa.ee.columbia.edu/matlab/tf_agc/ | |
| """ | |
| # 32 ms grid for FFT | |
| n_fft = 2 ** int(np.log(0.032 * fs) / np.log(2)) | |
| f_scale = float(f_scale) | |
| window_size = n_fft | |
| window_step = window_size // 2 | |
| X_freq = stft(X, window_size, mean_normalize=False) | |
| fft_fs = fs / window_step | |
| n_bands = max(10, 20 / f_scale) | |
| mel_width = f_scale * n_bands / 10. | |
| f_to_a = mel_freq_weights(n_fft, fs, n_bands, mel_width) | |
| f_to_a = f_to_a[:, :n_fft // 2] | |
| audiogram = np.abs(X_freq).dot(f_to_a.T) | |
| fbg = np.zeros_like(audiogram) | |
| state = np.zeros((audiogram.shape[1],)) | |
| alpha = np.exp(-(1. / fft_fs) / t_scale) | |
| for i in range(len(audiogram)): | |
| state = np.maximum(alpha * state, audiogram[i]) | |
| fbg[i] = state | |
| sf_to_a = np.sum(f_to_a, axis=0) | |
| E = np.diag(1. / (sf_to_a + (sf_to_a == 0))) | |
| E = E.dot(f_to_a.T) | |
| E = fbg.dot(E.T) | |
| E[E <= 0] = np.min(E[E > 0]) | |
| ts = istft(X_freq / E, window_size, mean_normalize=False) | |
| return ts, X_freq, E | |
| def hebbian_kmeans(X, n_clusters=10, n_epochs=10, W=None, learning_rate=0.01, | |
| batch_size=100, random_state=None, verbose=True): | |
| """ | |
| Modified from existing code from R. Memisevic | |
| See http://www.cs.toronto.edu/~rfm/code/hebbian_kmeans.py | |
| """ | |
| if W is None: | |
| if random_state is None: | |
| random_state = np.random.RandomState() | |
| W = 0.1 * random_state.randn(n_clusters, X.shape[1]) | |
| else: | |
| assert n_clusters == W.shape[0] | |
| X2 = (X ** 2).sum(axis=1, keepdims=True) | |
| last_print = 0 | |
| for e in range(n_epochs): | |
| for i in range(0, X.shape[0], batch_size): | |
| X_i = X[i: i + batch_size] | |
| X2_i = X2[i: i + batch_size] | |
| D = -2 * np.dot(W, X_i.T) | |
| D += (W ** 2).sum(axis=1, keepdims=True) | |
| D += X2_i.T | |
| S = (D == D.min(axis=0)[None, :]).astype("float").T | |
| W += learning_rate * ( | |
| np.dot(S.T, X_i) - S.sum(axis=0)[:, None] * W) | |
| if verbose: | |
| if e == 0 or e > (.05 * n_epochs + last_print): | |
| last_print = e | |
| print("Epoch %i of %i, cost %.4f" % ( | |
| e + 1, n_epochs, D.min(axis=0).sum())) | |
| return W | |
| def complex_to_real_view(arr_c): | |
| # Inplace view from complex to r, i as separate columns | |
| assert arr_c.dtype in [np.complex64, np.complex128] | |
| shp = arr_c.shape | |
| dtype = np.float64 if arr_c.dtype == np.complex128 else np.float32 | |
| arr_r = arr_c.ravel().view(dtype=dtype).reshape(shp[0], 2 * shp[1]) | |
| return arr_r | |
| def real_to_complex_view(arr_r): | |
| # Inplace view from real, image as columns to complex | |
| assert arr_r.dtype not in [np.complex64, np.complex128] | |
| shp = arr_r.shape | |
| dtype = np.complex128 if arr_r.dtype == np.float64 else np.complex64 | |
| arr_c = arr_r.ravel().view(dtype=dtype).reshape(shp[0], shp[1] // 2) | |
| return arr_c | |
| def complex_to_abs(arr_c): | |
| return np.abs(arr_c) | |
| def complex_to_angle(arr_c): | |
| return np.angle(arr_c) | |
| def abs_and_angle_to_complex(arr_abs, arr_angle): | |
| # abs(f_c2 - f_c) < 1E-15 | |
| return arr_abs * np.exp(1j * arr_angle) | |
| def polyphase_core(x, m, f): | |
| # x = input data | |
| # m = decimation rate | |
| # f = filter | |
| # Hack job - append zeros to match decimation rate | |
| if x.shape[0] % m != 0: | |
| x = np.append(x, np.zeros((m - x.shape[0] % m,))) | |
| if f.shape[0] % m != 0: | |
| f = np.append(f, np.zeros((m - f.shape[0] % m,))) | |
| polyphase = p = np.zeros((m, (x.shape[0] + f.shape[0]) / m), dtype=x.dtype) | |
| p[0, :-1] = np.convolve(x[::m], f[::m]) | |
| # Invert the x values when applying filters | |
| for i in range(1, m): | |
| p[i, 1:] = np.convolve(x[m - i::m], f[i::m]) | |
| return p | |
| def polyphase_single_filter(x, m, f): | |
| return np.sum(polyphase_core(x, m, f), axis=0) | |
| def polyphase_lowpass(arr, downsample=2, n_taps=50, filter_pad=1.1): | |
| filt = firwin(downsample * n_taps, 1 / (downsample * filter_pad)) | |
| filtered = polyphase_single_filter(arr, downsample, filt) | |
| return filtered | |
| def window(arr, window_size, window_step=1, axis=0): | |
| """ | |
| Directly taken from Erik Rigtorp's post to numpy-discussion. | |
| <http://www.mail-archive.com/[email protected]/msg29450.html> | |
| <http://stackoverflow.com/questions/4936620/using-strides-for-an-efficient-moving-average-filter> | |
| """ | |
| if window_size < 1: | |
| raise ValueError("`window_size` must be at least 1.") | |
| if window_size > arr.shape[-1]: | |
| raise ValueError("`window_size` is too long.") | |
| orig = list(range(len(arr.shape))) | |
| trans = list(range(len(arr.shape))) | |
| trans[axis] = orig[-1] | |
| trans[-1] = orig[axis] | |
| arr = arr.transpose(trans) | |
| shape = arr.shape[:-1] + (arr.shape[-1] - window_size + 1, window_size) | |
| strides = arr.strides + (arr.strides[-1],) | |
| strided = as_strided(arr, shape=shape, strides=strides) | |
| if window_step > 1: | |
| strided = strided[..., ::window_step, :] | |
| orig = list(range(len(strided.shape))) | |
| trans = list(range(len(strided.shape))) | |
| trans[-2] = orig[-1] | |
| trans[-1] = orig[-2] | |
| trans = trans[::-1] | |
| strided = strided.transpose(trans) | |
| return strided | |
| def unwindow(arr, window_size, window_step=1, axis=0): | |
| # undo windows by broadcast | |
| if axis != 0: | |
| raise ValueError("axis != 0 currently unsupported") | |
| shp = arr.shape | |
| unwindowed = np.tile(arr[:, None, ...], (1, window_step, 1, 1)) | |
| unwindowed = unwindowed.reshape(shp[0] * window_step, *shp[1:]) | |
| return unwindowed.mean(axis=1) | |
| def angle_to_sin_cos(arr_angle): | |
| return np.hstack((np.sin(arr_angle), np.cos(arr_angle))) | |
| def sin_cos_to_angle(arr_sin, arr_cos): | |
| return np.arctan2(arr_sin, arr_cos) | |
| def xcorr_offset(x1, x2): | |
| """ | |
| Under MSR-LA License | |
| Based on MATLAB implementation from Spectrogram Inversion Toolbox | |
| References | |
| ---------- | |
| D. Griffin and J. Lim. Signal estimation from modified | |
| short-time Fourier transform. IEEE Trans. Acoust. Speech | |
| Signal Process., 32(2):236-243, 1984. | |
| Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory | |
| Model Inversion for Sound Separation. Proc. IEEE-ICASSP, | |
| Adelaide, 1994, II.77-80. | |
| Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal | |
| Estimation from Modified Short-Time Fourier Transform | |
| Magnitude Spectra. IEEE Transactions on Audio Speech and | |
| Language Processing, 08/2007. | |
| """ | |
| x1 = x1 - x1.mean() | |
| x2 = x2 - x2.mean() | |
| frame_size = len(x2) | |
| half = frame_size // 2 | |
| corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32')) | |
| corrs[:half] = -1E30 | |
| corrs[-half:] = -1E30 | |
| offset = corrs.argmax() - len(x1) | |
| return offset | |
| def invert_spectrogram(X_s, step, calculate_offset=True, set_zero_phase=True): | |
| """ | |
| Under MSR-LA License | |
| Based on MATLAB implementation from Spectrogram Inversion Toolbox | |
| References | |
| ---------- | |
| D. Griffin and J. Lim. Signal estimation from modified | |
| short-time Fourier transform. IEEE Trans. Acoust. Speech | |
| Signal Process., 32(2):236-243, 1984. | |
| Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory | |
| Model Inversion for Sound Separation. Proc. IEEE-ICASSP, | |
| Adelaide, 1994, II.77-80. | |
| Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal | |
| Estimation from Modified Short-Time Fourier Transform | |
| Magnitude Spectra. IEEE Transactions on Audio Speech and | |
| Language Processing, 08/2007. | |
| """ | |
| size = int(X_s.shape[1] // 2) | |
| wave = np.zeros((X_s.shape[0] * step + size)) | |
| # Getting overflow warnings with 32 bit... | |
| wave = wave.astype('float64') | |
| total_windowing_sum = np.zeros((X_s.shape[0] * step + size)) | |
| win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) | |
| est_start = int(size // 2) - 1 | |
| est_end = est_start + size | |
| for i in range(X_s.shape[0]): | |
| wave_start = int(step * i) | |
| wave_end = wave_start + size | |
| if set_zero_phase: | |
| spectral_slice = X_s[i].real + 0j | |
| else: | |
| # already complex | |
| spectral_slice = X_s[i] | |
| # Don't need fftshift due to different impl. | |
| wave_est = np.real(np.fft.ifft(spectral_slice))[::-1] | |
| if calculate_offset and i > 0: | |
| offset_size = size - step | |
| if offset_size <= 0: | |
| print("WARNING: Large step size >50\% detected! " | |
| "This code works best with high overlap - try " | |
| "with 75% or greater") | |
| offset_size = step | |
| offset = xcorr_offset(wave[wave_start:wave_start + offset_size], | |
| wave_est[est_start:est_start + offset_size]) | |
| else: | |
| offset = 0 | |
| wave[wave_start:wave_end] += win * wave_est[ | |
| est_start - offset:est_end - offset] | |
| total_windowing_sum[wave_start:wave_end] += win | |
| wave = np.real(wave) / (total_windowing_sum + 1E-6) | |
| return wave | |
| def iterate_invert_spectrogram(X_s, fftsize, step, n_iter=10, verbose=False): | |
| """ | |
| Under MSR-LA License | |
| Based on MATLAB implementation from Spectrogram Inversion Toolbox | |
| References | |
| ---------- | |
| D. Griffin and J. Lim. Signal estimation from modified | |
| short-time Fourier transform. IEEE Trans. Acoust. Speech | |
| Signal Process., 32(2):236-243, 1984. | |
| Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory | |
| Model Inversion for Sound Separation. Proc. IEEE-ICASSP, | |
| Adelaide, 1994, II.77-80. | |
| Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal | |
| Estimation from Modified Short-Time Fourier Transform | |
| Magnitude Spectra. IEEE Transactions on Audio Speech and | |
| Language Processing, 08/2007. | |
| """ | |
| reg = np.max(X_s) / 1E8 | |
| X_best = copy.deepcopy(X_s) | |
| for i in range(n_iter): | |
| if verbose: | |
| print("Runnning iter %i" % i) | |
| if i == 0: | |
| X_t = invert_spectrogram(X_best, step, calculate_offset=True, | |
| set_zero_phase=True) | |
| else: | |
| # Calculate offset was False in the MATLAB version | |
| # but in mine it massively improves the result | |
| # Possible bug in my impl? | |
| X_t = invert_spectrogram(X_best, step, calculate_offset=True, | |
| set_zero_phase=False) | |
| est = stft(X_t, fftsize=fftsize, step=step, compute_onesided=False) | |
| phase = est / np.maximum(reg, np.abs(est)) | |
| X_best = X_s * phase[:len(X_s)] | |
| X_t = invert_spectrogram(X_best, step, calculate_offset=True, | |
| set_zero_phase=False) | |
| return np.real(X_t) | |
| def implot(arr, scale=None, title="", cmap="gray"): | |
| import matplotlib.pyplot as plt | |
| if scale is "specgram": | |
| # plotting part | |
| mag = 20. * np.log10(np.abs(arr)) | |
| # Transpose so time is X axis, and invert y axis so | |
| # frequency is low at bottom | |
| mag = mag.T[::-1, :] | |
| else: | |
| mag = arr | |
| f, ax = plt.subplots() | |
| ax.matshow(mag, cmap=cmap) | |
| plt.axis("off") | |
| x1 = mag.shape[0] | |
| y1 = mag.shape[1] | |
| def autoaspect(x_range, y_range): | |
| """ | |
| The aspect to make a plot square with ax.set_aspect in Matplotlib | |
| """ | |
| mx = max(x_range, y_range) | |
| mn = min(x_range, y_range) | |
| if x_range <= y_range: | |
| return mx / float(mn) | |
| else: | |
| return mn / float(mx) | |
| asp = autoaspect(x1, y1) | |
| ax.set_aspect(asp) | |
| plt.title(title) | |
| def test_lpc_to_lsf(): | |
| # Matlab style vectors for testing | |
| # lsf = [0.7842 1.5605 1.8776 1.8984 2.3593] | |
| # a = [1.0000 0.6149 0.9899 0.0000 0.0031 -0.0082]; | |
| lsf = [[0.7842, 1.5605, 1.8776, 1.8984, 2.3593], | |
| [0.7842, 1.5605, 1.8776, 1.8984, 2.3593]] | |
| a = [[1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082], | |
| [1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082]] | |
| a = np.array(a) | |
| lsf = np.array(lsf) | |
| lsf_r = lpc_to_lsf(a) | |
| assert_almost_equal(lsf, lsf_r, decimal=4) | |
| a_r = lsf_to_lpc(lsf) | |
| assert_almost_equal(a, a_r, decimal=4) | |
| lsf_r = lpc_to_lsf(a[0]) | |
| assert_almost_equal(lsf[0], lsf_r, decimal=4) | |
| a_r = lsf_to_lpc(lsf[0]) | |
| assert_almost_equal(a[0], a_r, decimal=4) | |
| def test_lpc_analysis_truncate(): | |
| # Test that truncate doesn't crash and actually truncates | |
| [a, g, e] = lpc_analysis(np.random.randn(85), order=8, window_step=80, | |
| window_size=80, emphasis=0.9, truncate=True) | |
| assert(a.shape[0] == 1) | |
| def test_feature_build(): | |
| samplerate, X = fetch_sample_music() | |
| # MATLAB wavread does normalization | |
| X = X.astype('float32') / (2 ** 15) | |
| wsz = 256 | |
| wst = 128 | |
| a, g, e = lpc_analysis(X, order=8, window_step=wst, | |
| window_size=wsz, emphasis=0.9, | |
| copy=True) | |
| v, p = voiced_unvoiced(X, window_size=wsz, | |
| window_step=wst) | |
| c = compress(e, n_components=64) | |
| # First component of a is always 1 | |
| combined = np.hstack((a[:, 1:], g, c[:a.shape[0]])) | |
| features = np.zeros((a.shape[0], 2 * combined.shape[1])) | |
| start_indices = v * combined.shape[1] | |
| start_indices = start_indices.astype('int32') | |
| end_indices = (v + 1) * combined.shape[1] | |
| end_indices = end_indices.astype('int32') | |
| for i in range(features.shape[0]): | |
| features[i, start_indices[i]:end_indices[i]] = combined[i] | |
| def test_mdct_and_inverse(): | |
| fs, X = fetch_sample_music() | |
| X_dct = mdct_slow(X) | |
| X_r = imdct_slow(X_dct) | |
| assert np.all(np.abs(X_r[:len(X)] - X) < 1E-3) | |
| assert np.abs(X_r[:len(X)] - X).mean() < 1E-6 | |
| def test_all(): | |
| test_lpc_analysis_truncate() | |
| test_feature_build() | |
| test_lpc_to_lsf() | |
| test_mdct_and_inverse() | |
| def run_lpc_example(): | |
| # ae.wav is from | |
| # http://www.linguistics.ucla.edu/people/hayes/103/Charts/VChart/ae.wav | |
| # Partially following the formant tutorial here | |
| # http://www.mathworks.com/help/signal/ug/formant-estimation-with-lpc-coefficients.html | |
| samplerate, X = fetch_sample_music() | |
| c = overlap_compress(X, 200, 400) | |
| X_r = overlap_uncompress(c, 400) | |
| wavfile.write('lpc_uncompress.wav', samplerate, soundsc(X_r)) | |
| print("Calculating sinusoids") | |
| f_hz, m = sinusoid_analysis(X, input_sample_rate=16000) | |
| Xs_sine = sinusoid_synthesis(f_hz, m) | |
| orig_fname = 'lpc_orig.wav' | |
| sine_fname = 'lpc_sine_synth.wav' | |
| wavfile.write(orig_fname, samplerate, soundsc(X)) | |
| wavfile.write(sine_fname, samplerate, soundsc(Xs_sine)) | |
| lpc_order_list = [8, ] | |
| dct_components_list = [200, ] | |
| window_size_list = [400, ] | |
| # Seems like a dct component size of ~2/3rds the step | |
| # (1/3rd the window for 50% overlap) works well. | |
| for lpc_order in lpc_order_list: | |
| for dct_components in dct_components_list: | |
| for window_size in window_size_list: | |
| # 50% overlap | |
| window_step = window_size // 2 | |
| a, g, e = lpc_analysis(X, order=lpc_order, | |
| window_step=window_step, | |
| window_size=window_size, emphasis=0.9, | |
| copy=True) | |
| print("Calculating LSF") | |
| lsf = lpc_to_lsf(a) | |
| # Not window_size - window_step! Need to implement overlap | |
| print("Calculating compression") | |
| c = compress(e, n_components=dct_components, | |
| window_size=window_step) | |
| co = overlap_compress(e, n_components=dct_components, | |
| window_size=window_step) | |
| block_excitation = uncompress(c, window_size=window_step) | |
| overlap_excitation = overlap_uncompress(co, | |
| window_size=window_step) | |
| a_r = lsf_to_lpc(lsf) | |
| f, m = lpc_to_frequency(a_r, g) | |
| block_lpc = lpc_synthesis(a_r, g, block_excitation, | |
| emphasis=0.9, | |
| window_step=window_step) | |
| overlap_lpc = lpc_synthesis(a_r, g, overlap_excitation, | |
| emphasis=0.9, | |
| window_step=window_step) | |
| v, p = voiced_unvoiced(X, window_size=window_size, | |
| window_step=window_step) | |
| noisy_lpc = lpc_synthesis(a_r, g, voiced_frames=v, | |
| emphasis=0.9, | |
| window_step=window_step) | |
| if dct_components is None: | |
| dct_components = window_size | |
| noisy_fname = 'lpc_noisy_synth_%iwin_%ilpc_%idct.wav' % ( | |
| window_size, lpc_order, dct_components) | |
| block_fname = 'lpc_block_synth_%iwin_%ilpc_%idct.wav' % ( | |
| window_size, lpc_order, dct_components) | |
| overlap_fname = 'lpc_overlap_synth_%iwin_%ilpc_%idct.wav' % ( | |
| window_size, lpc_order, dct_components) | |
| wavfile.write(noisy_fname, samplerate, soundsc(noisy_lpc)) | |
| wavfile.write(block_fname, samplerate, | |
| soundsc(block_lpc)) | |
| wavfile.write(overlap_fname, samplerate, | |
| soundsc(overlap_lpc)) | |
| def run_fft_vq_example(): | |
| n_fft = 512 | |
| time_smoothing = 4 | |
| def _pre(list_of_data): | |
| f_c = np.vstack([stft(dd, n_fft) for dd in list_of_data]) | |
| if len(f_c) % time_smoothing != 0: | |
| newlen = len(f_c) - len(f_c) % time_smoothing | |
| f_c = f_c[:newlen] | |
| f_mag = complex_to_abs(f_c) | |
| f_phs = complex_to_angle(f_c) | |
| f_sincos = angle_to_sin_cos(f_phs) | |
| f_r = np.hstack((f_mag, f_sincos)) | |
| f_r = f_r.reshape((len(f_r) // time_smoothing, | |
| time_smoothing * f_r.shape[1])) | |
| return f_r, n_fft | |
| def preprocess_train(list_of_data, random_state): | |
| f_r, n_fft = _pre(list_of_data) | |
| clusters = f_r | |
| return clusters | |
| def apply_preprocess(list_of_data, clusters): | |
| f_r, n_fft = _pre(list_of_data) | |
| memberships, distances = vq(f_r, clusters) | |
| vq_r = clusters[memberships] | |
| vq_r = vq_r.reshape((time_smoothing * len(vq_r), | |
| vq_r.shape[1] // time_smoothing)) | |
| f_mag = vq_r[:, :n_fft // 2] | |
| f_sincos = vq_r[:, n_fft // 2:] | |
| extent = f_sincos.shape[1] // 2 | |
| f_phs = sin_cos_to_angle(f_sincos[:, :extent], f_sincos[:, extent:]) | |
| vq_c = abs_and_angle_to_complex(f_mag, f_phs) | |
| d_k = istft(vq_c, fftsize=n_fft) | |
| return d_k | |
| random_state = np.random.RandomState(1999) | |
| """ | |
| fs, d = fetch_sample_music() | |
| sub = int(.8 * d.shape[0]) | |
| d1 = [d[:sub]] | |
| d2 = [d[sub:]] | |
| """ | |
| fs, d = fetch_sample_speech_fruit() | |
| d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8] | |
| d2 = d[7::8] | |
| # make sure d1 and d2 aren't the same! | |
| assert [len(di) for di in d1] != [len(di) for di in d2] | |
| clusters = preprocess_train(d1, random_state) | |
| # Training data | |
| vq_d1 = apply_preprocess(d1, clusters) | |
| vq_d2 = apply_preprocess(d2, clusters) | |
| assert [i != j for i, j in zip(vq_d1.ravel(), vq_d2.ravel())] | |
| fix_d1 = np.concatenate(d1) | |
| fix_d2 = np.concatenate(d2) | |
| wavfile.write("fft_train_no_agc.wav", fs, soundsc(fix_d1)) | |
| wavfile.write("fft_test_no_agc.wav", fs, soundsc(fix_d2)) | |
| wavfile.write("fft_vq_train_no_agc.wav", fs, soundsc(vq_d1, fs)) | |
| wavfile.write("fft_vq_test_no_agc.wav", fs, soundsc(vq_d2, fs)) | |
| agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) | |
| agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) | |
| agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5) | |
| agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) | |
| wavfile.write("fft_train_agc.wav", fs, soundsc(agc_d1)) | |
| wavfile.write("fft_test_agc.wav", fs, soundsc(agc_d2)) | |
| wavfile.write("fft_vq_train_agc.wav", fs, soundsc(agc_vq_d1, fs)) | |
| wavfile.write("fft_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) | |
| def run_dct_vq_example(): | |
| def _pre(list_of_data): | |
| # Temporal window setting is crucial! - 512 seems OK for music, 256 | |
| # fruit perhaps due to samplerates | |
| n_dct = 512 | |
| f_r = np.vstack([mdct_slow(dd, n_dct) for dd in list_of_data]) | |
| return f_r, n_dct | |
| def preprocess_train(list_of_data, random_state): | |
| f_r, n_dct = _pre(list_of_data) | |
| clusters = f_r | |
| return clusters | |
| def apply_preprocess(list_of_data, clusters): | |
| f_r, n_dct = _pre(list_of_data) | |
| f_clust = f_r | |
| memberships, distances = vq(f_clust, clusters) | |
| vq_r = clusters[memberships] | |
| d_k = imdct_slow(vq_r, n_dct) | |
| return d_k | |
| random_state = np.random.RandomState(1999) | |
| # This doesn't work very well due to only taking a sample from the end as | |
| # test | |
| fs, d = fetch_sample_music() | |
| sub = int(.8 * d.shape[0]) | |
| d1 = [d[:sub]] | |
| d2 = [d[sub:]] | |
| """ | |
| fs, d = fetch_sample_speech_fruit() | |
| d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8] | |
| d2 = d[7::8] | |
| # make sure d1 and d2 aren't the same! | |
| assert [len(di) for di in d1] != [len(di) for di in d2] | |
| """ | |
| clusters = preprocess_train(d1, random_state) | |
| # Training data | |
| vq_d1 = apply_preprocess(d1, clusters) | |
| vq_d2 = apply_preprocess(d2, clusters) | |
| assert [i != j for i, j in zip(vq_d2.ravel(), vq_d2.ravel())] | |
| fix_d1 = np.concatenate(d1) | |
| fix_d2 = np.concatenate(d2) | |
| wavfile.write("dct_train_no_agc.wav", fs, soundsc(fix_d1)) | |
| wavfile.write("dct_test_no_agc.wav", fs, soundsc(fix_d2)) | |
| wavfile.write("dct_vq_train_no_agc.wav", fs, soundsc(vq_d1)) | |
| wavfile.write("dct_vq_test_no_agc.wav", fs, soundsc(vq_d2)) | |
| """ | |
| import matplotlib.pyplot as plt | |
| plt.specgram(vq_d2, cmap="gray") | |
| plt.figure() | |
| plt.specgram(fix_d2, cmap="gray") | |
| plt.show() | |
| """ | |
| agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) | |
| agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) | |
| agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5) | |
| agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) | |
| wavfile.write("dct_train_agc.wav", fs, soundsc(agc_d1)) | |
| wavfile.write("dct_test_agc.wav", fs, soundsc(agc_d2)) | |
| wavfile.write("dct_vq_train_agc.wav", fs, soundsc(agc_vq_d1)) | |
| wavfile.write("dct_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) | |
| def run_phase_reconstruction_example(): | |
| fs, d = fetch_sample_speech_tapestry() | |
| # actually gives however many components you say! So double what .m file | |
| # says | |
| fftsize = 512 | |
| step = 64 | |
| X_s = np.abs(stft(d, fftsize=fftsize, step=step, real=False, | |
| compute_onesided=False)) | |
| X_t = iterate_invert_spectrogram(X_s, fftsize, step, verbose=True) | |
| """ | |
| import matplotlib.pyplot as plt | |
| plt.specgram(d, cmap="gray") | |
| plt.savefig("1.png") | |
| plt.close() | |
| plt.imshow(X_s, cmap="gray") | |
| plt.savefig("2.png") | |
| plt.close() | |
| """ | |
| wavfile.write("phase_original.wav", fs, soundsc(d)) | |
| wavfile.write("phase_reconstruction.wav", fs, soundsc(X_t)) | |
| def run_phase_vq_example(): | |
| def _pre(list_of_data): | |
| # Temporal window setting is crucial! - 512 seems OK for music, 256 | |
| # fruit perhaps due to samplerates | |
| n_fft = 256 | |
| step = 32 | |
| f_r = np.vstack([np.abs(stft(dd, n_fft, step=step, real=False, | |
| compute_onesided=False)) | |
| for dd in list_of_data]) | |
| return f_r, n_fft, step | |
| def preprocess_train(list_of_data, random_state): | |
| f_r, n_fft, step = _pre(list_of_data) | |
| clusters = copy.deepcopy(f_r) | |
| return clusters | |
| def apply_preprocess(list_of_data, clusters): | |
| f_r, n_fft, step = _pre(list_of_data) | |
| f_clust = f_r | |
| # Nondeterministic ? | |
| memberships, distances = vq(f_clust, clusters) | |
| vq_r = clusters[memberships] | |
| d_k = iterate_invert_spectrogram(vq_r, n_fft, step, verbose=True) | |
| return d_k | |
| random_state = np.random.RandomState(1999) | |
| fs, d = fetch_sample_speech_fruit() | |
| d1 = d[::9] | |
| d2 = d[7::8][:5] | |
| # make sure d1 and d2 aren't the same! | |
| assert [len(di) for di in d1] != [len(di) for di in d2] | |
| clusters = preprocess_train(d1, random_state) | |
| fix_d1 = np.concatenate(d1) | |
| fix_d2 = np.concatenate(d2) | |
| vq_d2 = apply_preprocess(d2, clusters) | |
| wavfile.write("phase_train_no_agc.wav", fs, soundsc(fix_d1)) | |
| wavfile.write("phase_vq_test_no_agc.wav", fs, soundsc(vq_d2)) | |
| agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5) | |
| agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5) | |
| agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5) | |
| """ | |
| import matplotlib.pyplot as plt | |
| plt.specgram(agc_vq_d2, cmap="gray") | |
| #plt.title("Fake") | |
| plt.figure() | |
| plt.specgram(agc_d2, cmap="gray") | |
| #plt.title("Real") | |
| plt.show() | |
| """ | |
| wavfile.write("phase_train_agc.wav", fs, soundsc(agc_d1)) | |
| wavfile.write("phase_test_agc.wav", fs, soundsc(agc_d2)) | |
| wavfile.write("phase_vq_test_agc.wav", fs, soundsc(agc_vq_d2)) | |
| def run_cqt_example(): | |
| fs, d = fetch_sample_file("/Users/User/cqt_resources/kempff1.wav") | |
| X = d[:44100] | |
| X_cq, c_dc, c_nyq, multiscale, shift, window_lens = cqt(X, fs) | |
| X_r = icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens) | |
| SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) | |
| wavfile.write("cqt_original.wav", fs, soundsc(X)) | |
| wavfile.write("cqt_reconstruction.wav", fs, soundsc(X_r)) | |
| def run_fft_dct_example(): | |
| random_state = np.random.RandomState(1999) | |
| fs, d = fetch_sample_speech_fruit() | |
| n_fft = 64 | |
| X = d[0] | |
| X_stft = stft(X, n_fft) | |
| X_rr = complex_to_real_view(X_stft) | |
| X_dct = fftpack.dct(X_rr, axis=-1, norm='ortho') | |
| X_dct_sub = X_dct[1:] - X_dct[:-1] | |
| std = X_dct_sub.std(axis=0, keepdims=True) | |
| X_dct_sub += .01 * std * random_state.randn( | |
| X_dct_sub.shape[0], X_dct_sub.shape[1]) | |
| X_dct_unsub = np.cumsum(X_dct_sub, axis=0) | |
| X_idct = fftpack.idct(X_dct_unsub, axis=-1, norm='ortho') | |
| X_irr = real_to_complex_view(X_idct) | |
| X_r = istft(X_irr, n_fft)[:len(X)] | |
| SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X)) | |
| print(SNR) | |
| wavfile.write("fftdct_orig.wav", fs, soundsc(X)) | |
| wavfile.write("fftdct_rec.wav", fs, soundsc(X_r)) | |
| if __name__ == "__main__": | |
| """ | |
| Trying to run all examples will seg fault on my laptop - probably memory! | |
| Comment individually | |
| """ | |
| # run_phase_reconstruction_example() | |
| # run_phase_vq_example() | |
| # run_dct_vq_example() | |
| # run_fft_vq_example() | |
| # run_lpc_example() | |
| # run_cqt_example() | |
| run_fft_dct_example() | |
| test_all() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment