Skip to content

Instantly share code, notes, and snippets.

@gvanslyk
Forked from kastnerkyle/audio_tools.py
Created September 14, 2016 22:08
Show Gist options
  • Save gvanslyk/3c804205f6a0a3e8fddd2d01bdd8a2eb to your computer and use it in GitHub Desktop.
Save gvanslyk/3c804205f6a0a3e8fddd2d01bdd8a2eb to your computer and use it in GitHub Desktop.
Audio tools for numpy/python. Constant work in progress.
# License: BSD 3-clause
# Authors: Kyle Kastner
import numpy as np
# Do numpy version check for <= 1.9 ! Something crazy going on in 1.10
if int(np.__version__.split(".")[1]) >= 10:
print("Only numpy <= 1.9 currently supported! There is a "
"numerical error in one of the numpy 1.10 routines. "
"Hopefully this will be debugged an corrected soon. "
"For the intrepid, the error can be seen by running"
"run_phase_reconstruction()")
from numpy.lib.stride_tricks import as_strided
import scipy.signal as sg
from scipy.cluster.vq import vq
from scipy import linalg, fftpack
from numpy.testing import assert_almost_equal
from scipy.linalg import svd
from scipy.io import wavfile
from scipy.signal import firwin
import zipfile
import tarfile
import os
import copy
try:
import urllib.request as urllib # for backwards compatibility
except ImportError:
import urllib2 as urllib
def download(url, server_fname, local_fname=None, progress_update_percentage=5,
bypass_certificate_check=False):
"""
An internet download utility modified from
http://stackoverflow.com/questions/22676/
how-do-i-download-a-file-over-http-using-python/22776#22776
"""
if bypass_certificate_check:
import ssl
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
u = urllib.urlopen(url, context=ctx)
else:
u = urllib.urlopen(url)
if local_fname is None:
local_fname = server_fname
full_path = local_fname
meta = u.info()
with open(full_path, 'wb') as f:
try:
file_size = int(meta.get("Content-Length"))
except TypeError:
print("WARNING: Cannot get file size, displaying bytes instead!")
file_size = 100
print("Downloading: %s Bytes: %s" % (server_fname, file_size))
file_size_dl = 0
block_sz = int(1E7)
p = 0
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_size_dl += len(buffer)
f.write(buffer)
if (file_size_dl * 100. / file_size) > p:
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl *
100. / file_size)
print(status)
p += progress_update_percentage
def fetch_sample_speech_tapestry():
url = "https://www.dropbox.com/s/qte66a7haqspq2g/tapestry.wav?dl=1"
wav_path = "tapestry.wav"
if not os.path.exists(wav_path):
download(url, wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo? - just choose one channel
return fs, d
def fetch_sample_file(wav_path):
if not os.path.exists(wav_path):
raise ValueError("Unable to find file at path %s" % wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo - just choose one channel
if len(d.shape) > 1:
d = d[:, 0]
return fs, d
def fetch_sample_music():
url = "http://www.music.helsinki.fi/tmt/opetus/uusmedia/esim/"
url += "a2002011001-e02-16kHz.wav"
wav_path = "test.wav"
if not os.path.exists(wav_path):
download(url, wav_path)
fs, d = wavfile.read(wav_path)
d = d.astype('float32') / (2 ** 15)
# file is stereo - just choose one channel
d = d[:, 0]
return fs, d
def fetch_sample_speech_fruit(n_samples=None):
url = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz'
wav_path = "audio.tar.gz"
if not os.path.exists(wav_path):
download(url, wav_path)
tf = tarfile.open(wav_path)
wav_names = [fname for fname in tf.getnames()
if ".wav" in fname.split(os.sep)[-1]]
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
f = tf.extractfile(wav_name)
fs, d = wavfile.read(f)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
return fs, speech
def fetch_sample_speech_eustace(n_samples=None):
"""
http://www.cstr.ed.ac.uk/projects/eustace/download.html
"""
# data
url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_wav.zip"
wav_path = "eustace_wav.zip"
if not os.path.exists(wav_path):
download(url, wav_path)
# labels
url = "http://www.cstr.ed.ac.uk/projects/eustace/down/eustace_labels.zip"
labels_path = "eustace_labels.zip"
if not os.path.exists(labels_path):
download(url, labels_path)
# Read wavfiles
# 16 kHz wav
zf = zipfile.ZipFile(wav_path, 'r')
wav_names = [fname for fname in zf.namelist()
if ".wav" in fname.split(os.sep)[-1]]
fs = 16000
speech = []
print("Loading speech files...")
for wav_name in wav_names[:n_samples]:
wav_str = zf.read(wav_name)
d = np.frombuffer(wav_str, dtype=np.int16)
d = d.astype('float32') / (2 ** 15)
speech.append(d)
zf = zipfile.ZipFile(labels_path, 'r')
label_names = [fname for fname in zf.namelist()
if ".lab" in fname.split(os.sep)[-1]]
labels = []
print("Loading label files...")
for label_name in label_names[:n_samples]:
label_file_str = zf.read(label_name)
labels.append(label_file_str)
return fs, speech
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute STFT for 1D real valued input X
"""
if real:
local_fft = np.fft.rfft
cut = -1
else:
local_fft = np.fft.fft
cut = None
if compute_onesided:
cut = fftsize // 2
if mean_normalize:
X -= X.mean()
if step == "half":
X = halfoverlap(X, fftsize)
else:
X = overlap(X, fftsize, step)
size = fftsize
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
X = X * win[None]
X = local_fft(X)[:, :cut]
return X
def istft(X, fftsize=128, mean_normalize=True, real=False,
compute_onesided=True):
"""
Compute ISTFT for STFT transformed X
"""
if real:
local_ifft = np.fft.irfft
X_pad = np.zeros((X.shape[0], X.shape[1] + 1)) + 0j
X_pad[:, :-1] = X
X = X_pad
else:
local_ifft = np.fft.ifft
if compute_onesided:
X_pad = np.zeros((X.shape[0], 2 * X.shape[1])) + 0j
X_pad[:, :fftsize // 2] = X
X_pad[:, fftsize // 2:] = 0
X = X_pad
X = local_ifft(X).astype("float64")
X = invert_halfoverlap(X)
if mean_normalize:
X -= np.mean(X)
return X
def mdct_slow(X, dctsize=128):
M = dctsize
N = 2 * dctsize
N_0 = (M + 1) / 2
X = halfoverlap(X, N)
X = sine_window(X)
n, k = np.meshgrid(np.arange(N), np.arange(M))
# Use transpose due to "samples as rows" convention
tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M).T
return np.dot(X, tf)
def imdct_slow(X, dctsize=128):
M = dctsize
N = 2 * dctsize
N_0 = (M + 1) / 2
N_4 = N / 4
n, k = np.meshgrid(np.arange(N), np.arange(M))
# inverse *is not* transposed
tf = np.cos(np.pi * (n + N_0) * (k + 0.5) / M)
X_r = np.dot(X, tf) / N_4
X_r = sine_window(X_r)
X = invert_halfoverlap(X_r)
return X
def nsgcwin(fmin, fmax, n_bins, fs, signal_len, gamma):
"""
Nonstationary Gabor window calculation
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
# use a hanning window
# no fractional shifts
fftres = fs / signal_len
fmin = float(fmin)
fmax = float(fmax)
gamma = float(gamma)
nyq = fs / 2.
b = np.floor(n_bins * np.log2(fmax / fmin))
fbas = fmin * 2 ** (np.arange(b + 1) / float(n_bins))
Q = 2 ** (1. / n_bins) - 2 ** (-1. / n_bins)
cqtbw = Q * fbas + gamma
cqtbw = cqtbw.ravel()
maxidx = np.where(fbas + cqtbw / 2. > nyq)[0]
if len(maxidx) > 0:
# replicate bug in MATLAB version...
# or is it a feature
if sum(maxidx) == 0:
first = len(cqtbw) - 1
else:
first = maxidx[0]
fbas = fbas[:first]
cqtbw = cqtbw[:first]
minidx = np.where(fbas - cqtbw / 2. < 0)[0]
if len(minidx) > 0:
fbas = fbas[minidx[-1]+1:]
cqtbw = cqtbw[minidx[-1]+1:]
fbas_len = len(fbas)
fbas_new = np.zeros((2 * (len(fbas) + 1)))
fbas_new[1:len(fbas) + 1] = fbas
fbas = fbas_new
fbas[fbas_len + 1] = nyq
fbas[fbas_len + 2:] = fs - fbas[1:fbas_len + 1][::-1]
bw = np.zeros_like(fbas)
bw[0] = 2 * fmin
bw[1:len(cqtbw) + 1] = cqtbw
bw[len(cqtbw) + 1] = fbas[fbas_len + 2] - fbas[fbas_len]
bw[-len(cqtbw):] = cqtbw[::-1]
bw = bw / fftres
fbas = fbas / fftres
posit = np.zeros_like(fbas)
posit[:fbas_len + 2] = np.floor(fbas[:fbas_len + 2])
posit[fbas_len + 2:] = np.ceil(fbas[fbas_len + 2:])
base_shift = -posit[-1] % signal_len
shift = np.zeros_like(posit).astype("int32")
shift[1:] = (posit[1:] - posit[:-1]).astype("int32")
shift[0] = base_shift
bw = np.round(bw)
bwfac = 1
M = bw
min_win = 4
for ii in range(len(bw)):
if bw[ii] < min_win:
bw[ii] = min_win
M[ii] = bw[ii]
def _win(numel):
if numel % 2 == 0:
s1 = np.arange(0, .5, 1. / numel)
if len(s1) != numel // 2:
# edge case with small floating point numbers...
s1 = s1[:-1]
s2 = np.arange(-.5, 0, 1. / numel)
if len(s2) != numel // 2:
# edge case with small floating point numbers...
s2 = s2[:-1]
x = np.concatenate((s1, s2))
else:
s1 = np.arange(0, .5, 1. / numel)
s2 = np.arange(-.5 + .5 / numel, 0, 1. / numel)
if len(s2) != numel // 2: # assume integer truncate 27 // 2 = 13
s2 = s2[:-1]
x = np.concatenate((s1, s2))
assert len(x) == numel
g = .5 + .5 * np.cos(2 * np.pi * x)
return g
multiscale = [_win(bi) for bi in bw]
bw = bwfac * np.ceil(M / bwfac)
for kk in [0, fbas_len + 1]:
if M[kk] > M[kk + 1]:
multiscale[kk] = np.ones(M[kk]).astype(multiscale[0].dtype)
i1 = np.floor(M[kk] / 2) - np.floor(M[kk + 1] / 2)
i2 = np.floor(M[kk] / 2) + np.ceil(M[kk + 1] / 2)
# Very rarely, gets an off by 1 error? Seems to be at the end...
# for now, slice
multiscale[kk][i1:i2] = _win(M[kk + 1])
multiscale[kk] = multiscale[kk] / np.sqrt(M[kk])
return multiscale, shift, M
def nsgtf_real(X, multiscale, shift, window_lens):
"""
Nonstationary Gabor Transform for real values
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
# This will break with multchannel input
signal_len = len(X)
N = len(shift)
X_fft = np.fft.fft(X)
fill = np.sum(shift) - signal_len
if fill > 0:
X_fft_tmp = np.zeros((signal_len + shift))
X_fft_tmp[:len(X_fft)] = X_fft
X_fft = X_fft_tmp
posit = np.cumsum(shift) - shift[0]
scale_lens = np.array([len(m) for m in multiscale])
N = np.where(posit - np.floor(scale_lens) <= (signal_len + fill) / 2)[0][-1]
c = []
# c[0] is almost exact
for ii in range(N):
idx_l = np.arange(np.ceil(scale_lens[ii] / 2), scale_lens[ii])
idx_r = np.arange(np.ceil(scale_lens[ii] / 2))
idx = np.concatenate((idx_l, idx_r))
idx = idx.astype("int32")
subwin_range = posit[ii] + np.arange(-np.floor(scale_lens[ii] / 2),
np.ceil(scale_lens[ii] / 2))
win_range = subwin_range % (signal_len + fill)
win_range = win_range.astype("int32")
if window_lens[ii] < scale_lens[ii]:
raise ValueError("Not handling 'not enough channels' case")
else:
temp = np.zeros((window_lens[ii],)).astype(X_fft.dtype)
temp_idx_l = np.arange(len(temp) - np.floor(scale_lens[ii] / 2),
len(temp))
temp_idx_r = np.arange(np.ceil(scale_lens[ii] / 2))
temp_idx = np.concatenate((temp_idx_l, temp_idx_r))
temp_idx = temp_idx.astype("int32")
temp[temp_idx] = X_fft[win_range] * multiscale[ii][idx]
fs_new_bins = window_lens[ii]
fk_bins = posit[ii]
displace = fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins
displace = displace.astype("int32")
temp = np.roll(temp, displace)
c.append(np.fft.ifft(temp))
if 0:
# cell2mat concatenation
c = np.concatenate(c)
return c
def nsdual(multiscale, shift, window_lens):
"""
Calculation of nonstationary inverse gabor filters
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
N = len(shift)
posit = np.cumsum(shift)
seq_len = posit[-1]
posit = posit - shift[0]
diagonal = np.zeros((seq_len,))
win_range = []
for ii in range(N):
filt_len = len(multiscale[ii])
idx = np.arange(-np.floor(filt_len / 2), np.ceil(filt_len / 2))
win_range.append((posit[ii] + idx) % seq_len)
subdiag = window_lens[ii] * np.fft.fftshift(multiscale[ii]) ** 2
ind = win_range[ii].astype(np.int)
diagonal[ind] = diagonal[ind] + subdiag
dual_multiscale = multiscale
for ii in range(N):
ind = win_range[ii].astype(np.int)
dual_multiscale[ii] = np.fft.ifftshift(
np.fft.fftshift(dual_multiscale[ii]) / diagonal[ind])
return dual_multiscale
def nsgitf_real(c, c_dc, c_nyq, multiscale, shift):
"""
Nonstationary Inverse Gabor Transform on real valued signal
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
c_l = []
c_l.append(c_dc)
c_l.extend([ci for ci in c])
c_l.append(c_nyq)
posit = np.cumsum(shift)
seq_len = posit[-1]
posit -= shift[0]
out = np.zeros((seq_len,)).astype(c_l[1].dtype)
for ii in range(len(c_l)):
filt_len = len(multiscale[ii])
win_range = posit[ii] + np.arange(-np.floor(filt_len / 2),
np.ceil(filt_len / 2))
win_range = (win_range % seq_len).astype(np.int)
temp = np.fft.fft(c_l[ii]) * len(c_l[ii])
fs_new_bins = len(c_l[ii])
fk_bins = posit[ii]
displace = int(fk_bins - np.floor(fk_bins / fs_new_bins) * fs_new_bins)
temp = np.roll(temp, -displace)
l = np.arange(len(temp) - np.floor(filt_len / 2), len(temp))
r = np.arange(np.ceil(filt_len / 2))
temp_idx = (np.concatenate((l, r)) % len(temp)).astype(np.int)
temp = temp[temp_idx]
lf = np.arange(filt_len - np.floor(filt_len / 2), filt_len)
rf = np.arange(np.ceil(filt_len / 2))
filt_idx = np.concatenate((lf, rf)).astype(np.int)
m = multiscale[ii][filt_idx]
out[win_range] = out[win_range] + m * temp
nyq_bin = np.floor(seq_len / 2) + 1
out_idx = np.arange(
nyq_bin - np.abs(1 - seq_len % 2) - 1, 0, -1).astype(np.int)
out[nyq_bin:] = np.conj(out[out_idx])
t_out = np.real(np.fft.ifft(out)).astype(np.float64)
return t_out
def cqt(X, fs, n_bins=48, fmin=27.5, fmax="nyq", gamma=20):
"""
Constant Q Transform
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
if fmax == "nyq":
fmax = fs / 2.
multiscale, shift, window_lens = nsgcwin(fmin, fmax, n_bins, fs,
len(X), gamma)
fbas = fs * np.cumsum(shift[1:]) / len(X)
fbas = fbas[:len(window_lens) // 2 - 1]
bins = window_lens.shape[0] // 2 - 1
window_lens[1:bins + 1] = window_lens[bins + 2]
window_lens[bins + 2:] = window_lens[1:bins + 1][::-1]
norm = 2. * window_lens[:bins + 2] / float(len(X))
norm = np.concatenate((norm, norm[1:-1][::-1]))
multiscale = [norm[ii] * multiscale[ii] for ii in range(2 * (bins + 1))]
c = nsgtf_real(X, multiscale, shift, window_lens)
c_dc = c[0]
c_nyq = c[-1]
c_sub = c[1:-1]
c = np.vstack(c_sub)
return c, c_dc, c_nyq, multiscale, shift, window_lens
def icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens):
"""
Inverse constant Q Transform
References
----------
Velasco G. A., Holighaus N., Dörfler M., Grill T.
Constructing an invertible constant-Q transform with nonstationary Gabor
frames, Proceedings of the 14th International Conference on Digital
Audio Effects (DAFx 11), Paris, France, 2011
Holighaus N., Dörfler M., Velasco G. A. and Grill T.
A framework for invertible, real-time constant-Q transforms, submitted.
Original matlab code copyright follows:
AUTHOR(s) : Monika Dörfler, Gino Angelo Velasco, Nicki Holighaus, 2010-2011
COPYRIGHT : (c) NUHAG, Dept.Math., University of Vienna, AUSTRIA
http://nuhag.eu/
Permission is granted to modify and re-distribute this
code in any manner as long as this notice is preserved.
All standard disclaimers apply.
"""
new_multiscale = nsdual(multiscale, shift, window_lens)
X = nsgitf_real(X_cq, c_dc, c_nyq, new_multiscale, shift)
return X
def rolling_mean(X, window_size):
w = 1.0 / window_size * np.ones((window_size))
return np.correlate(X, w, 'valid')
def voiced_unvoiced(X, window_size=256, window_step=128, copy=True):
"""
Voiced unvoiced detection from a raw signal
Based on code from:
https://www.clear.rice.edu/elec532/PROJECTS96/lpc/code.html
Other references:
http://www.seas.ucla.edu/spapl/code/harmfreq_MOLRT_VAD.m
Parameters
----------
X : ndarray
Raw input signal
window_size : int, optional (default=256)
The window size to use, in samples.
window_step : int, optional (default=128)
How far the window steps after each calculation, in samples.
copy : bool, optional (default=True)
Whether to make a copy of the input array or allow in place changes.
"""
X = np.array(X, copy=copy)
if len(X.shape) < 2:
X = X[None]
n_points = X.shape[1]
n_windows = n_points // window_step
# Padding
pad_sizes = [(window_size - window_step) // 2,
window_size - window_step // 2]
# TODO: Handling for odd window sizes / steps
X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X,
np.zeros((X.shape[0], pad_sizes[1]))))
clipping_factor = 0.6
b, a = sg.butter(10, np.pi * 9 / 40)
voiced_unvoiced = np.zeros((n_windows, 1))
period = np.zeros((n_windows, 1))
for window in range(max(n_windows - 1, 1)):
XX = X.ravel()[window * window_step + np.arange(window_size)]
XX *= sg.hamming(len(XX))
XX = sg.lfilter(b, a, XX)
left_max = np.max(np.abs(XX[:len(XX) // 3]))
right_max = np.max(np.abs(XX[-len(XX) // 3:]))
clip_value = clipping_factor * np.min([left_max, right_max])
XX_clip = np.clip(XX, clip_value, -clip_value)
XX_corr = np.correlate(XX_clip, XX_clip, mode='full')
center = np.argmax(XX_corr)
right_XX_corr = XX_corr[center:]
prev_window = max([window - 1, 0])
if voiced_unvoiced[prev_window] > 0:
# Want it to be harder to turn off than turn on
strength_factor = .29
else:
strength_factor = .3
start = np.where(right_XX_corr < .3 * XX_corr[center])[0]
# 20 is hardcoded but should depend on samplerate?
start = np.max([20, start[0]])
search_corr = right_XX_corr[start:]
index = np.argmax(search_corr)
second_max = search_corr[index]
if (second_max > strength_factor * XX_corr[center]):
voiced_unvoiced[window] = 1
period[window] = start + index - 1
else:
voiced_unvoiced[window] = 0
period[window] = 0
return np.array(voiced_unvoiced), np.array(period)
def lpc_analysis(X, order=8, window_step=128, window_size=2 * 128,
emphasis=0.9, voiced_start_threshold=.9,
voiced_stop_threshold=.6, truncate=False, copy=True):
"""
Extract LPC coefficients from a signal
Based on code from:
http://labrosa.ee.columbia.edu/matlab/sws/
Parameters
----------
X : ndarray
Signals to extract LPC coefficients from
order : int, optional (default=8)
Order of the LPC coefficients. For speech, use the general rule that the
order is two times the expected number of formants plus 2.
This can be formulated as 2 + 2 * (fs // 2000). For approx. signals
with fs = 7000, this is 8 coefficients - 2 + 2 * (7000 // 2000).
window_step : int, optional (default=128)
The size (in samples) of the space between each window
window_size : int, optional (default=2 * 128)
The size of each window (in samples) to extract coefficients over
emphasis : float, optional (default=0.9)
The emphasis coefficient to use for filtering
voiced_start_threshold : float, optional (default=0.9)
Upper power threshold for estimating when speech has started
voiced_stop_threshold : float, optional (default=0.6)
Lower power threshold for estimating when speech has stopped
truncate : bool, optional (default=False)
Whether to cut the data at the last window or do zero padding.
copy : bool, optional (default=True)
Whether to copy the input X or modify in place
Returns
-------
lp_coefficients : ndarray
lp coefficients to describe the frame
per_frame_gain : ndarray
calculated gain for each frame
residual_excitation : ndarray
leftover energy which is not described by lp coefficents and gain
voiced_frames : ndarray
array of [0, 1] values which holds voiced/unvoiced decision for each
frame.
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
if len(X.shape) < 2:
X = X[None]
n_points = X.shape[1]
n_windows = n_points // window_step
if not truncate:
pad_sizes = [(window_size - window_step) // 2,
window_size - window_step // 2]
# TODO: Handling for odd window sizes / steps
X = np.hstack((np.zeros((X.shape[0], pad_sizes[0])), X,
np.zeros((X.shape[0], pad_sizes[1]))))
else:
pad_sizes = [0, 0]
X = X[0, :n_windows * window_step]
lp_coefficients = np.zeros((n_windows, order + 1))
per_frame_gain = np.zeros((n_windows, 1))
residual_excitation = np.zeros(
((n_windows - 1) * window_step + window_size))
# Pre-emphasis high-pass filter
X = sg.lfilter([1, -emphasis], 1, X)
# stride_tricks.as_strided?
autocorr_X = np.zeros((n_windows, 2 * window_size - 1))
for window in range(max(n_windows - 1, 1)):
XX = X.ravel()[window * window_step + np.arange(window_size)]
WXX = XX * sg.hanning(window_size)
autocorr_X[window] = np.correlate(WXX, WXX, mode='full')
center = np.argmax(autocorr_X[window])
RXX = autocorr_X[window,
np.arange(center, window_size + order)]
R = linalg.toeplitz(RXX[:-1])
solved_R = linalg.pinv(R).dot(RXX[1:])
filter_coefs = np.hstack((1, -solved_R))
residual_signal = sg.lfilter(filter_coefs, 1, WXX)
gain = np.sqrt(np.mean(residual_signal ** 2))
lp_coefficients[window] = filter_coefs
per_frame_gain[window] = gain
assign_range = window * window_step + np.arange(window_size)
residual_excitation[assign_range] += residual_signal / gain
# Throw away first part in overlap mode for proper synthesis
residual_excitation = residual_excitation[pad_sizes[0]:]
return lp_coefficients, per_frame_gain, residual_excitation
def lpc_synthesis(lp_coefficients, per_frame_gain, residual_excitation=None,
voiced_frames=None, window_step=128, emphasis=0.9):
"""
Synthesize a signal from LPC coefficients
Based on code from:
http://labrosa.ee.columbia.edu/matlab/sws/
http://web.uvic.ca/~tyoon/resource/auditorytoolbox/auditorytoolbox/synlpc.html
Parameters
----------
lp_coefficients : ndarray
Linear prediction coefficients
per_frame_gain : ndarray
Gain coefficients
residual_excitation : ndarray or None, optional (default=None)
Residual excitations. If None, this will be synthesized with white noise
voiced_frames : ndarray or None, optional (default=None)
Voiced frames. If None, all frames assumed to be voiced.
window_step : int, optional (default=128)
The size (in samples) of the space between each window
emphasis : float, optional (default=0.9)
The emphasis coefficient to use for filtering
overlap_add : bool, optional (default=True)
What type of processing to use when joining windows
copy : bool, optional (default=True)
Whether to copy the input X or modify in place
Returns
-------
synthesized : ndarray
Sound vector synthesized from input arguments
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
# TODO: Incorporate better synthesis from
# http://eecs.oregonstate.edu/education/docs/ece352/CompleteManual.pdf
window_size = 2 * window_step
[n_windows, order] = lp_coefficients.shape
n_points = (n_windows + 1) * window_step
n_excitation_points = n_points + window_step + window_step // 2
random_state = np.random.RandomState(1999)
if residual_excitation is None:
# Need to generate excitation
if voiced_frames is None:
# No voiced/unvoiced info, so just use randn
voiced_frames = np.ones((lp_coefficients.shape[0], 1))
residual_excitation = np.zeros((n_excitation_points))
f, m = lpc_to_frequency(lp_coefficients, per_frame_gain)
t = np.linspace(0, 1, window_size, endpoint=False)
hanning = sg.hanning(window_size)
for window in range(n_windows):
window_base = window * window_step
index = window_base + np.arange(window_size)
if voiced_frames[window]:
sig = np.zeros_like(t)
cycles = np.cumsum(f[window][0] * t)
sig += sg.sawtooth(cycles, 0.001)
residual_excitation[index] += hanning * sig
residual_excitation[index] += hanning * 0.01 * random_state.randn(
window_size)
else:
n_excitation_points = residual_excitation.shape[0]
n_points = n_excitation_points + window_step + window_step // 2
residual_excitation = np.hstack((residual_excitation,
np.zeros(window_size)))
if voiced_frames is None:
voiced_frames = np.ones_like(per_frame_gain)
synthesized = np.zeros((n_points))
for window in range(n_windows):
window_base = window * window_step
oldbit = synthesized[window_base + np.arange(window_step)]
w_coefs = lp_coefficients[window]
if not np.all(w_coefs):
# Hack to make lfilter avoid
# ValueError: BUG: filter coefficient a[0] == 0 not supported yet
# when all coeffs are 0
w_coefs = [1]
g_coefs = voiced_frames[window] * per_frame_gain[window]
index = window_base + np.arange(window_size)
newbit = g_coefs * sg.lfilter([1], w_coefs,
residual_excitation[index])
synthesized[index] = np.hstack((oldbit, np.zeros(
(window_size - window_step))))
synthesized[index] += sg.hanning(window_size) * newbit
synthesized = sg.lfilter([1], [1, -emphasis], synthesized)
return synthesized
def soundsc(X, copy=True):
"""
Approximate implementation of soundsc from MATLAB without the audio playing.
Parameters
----------
X : ndarray
Signal to be rescaled
copy : bool, optional (default=True)
Whether to make a copy of input signal or operate in place.
Returns
-------
X_sc : ndarray
(-1, 1) scaled version of X as float32, suitable for writing
with scipy.io.wavfile
"""
X = np.array(X, copy=copy)
X = (X - X.min()) / (X.max() - X.min())
X = .9 * X
X = 2 * X - 1
return X.astype('float32')
def lpc_to_frequency(lp_coefficients, per_frame_gain):
"""
Extract resonant frequencies and magnitudes from LPC coefficients and gains.
Parameters
----------
lp_coefficients : ndarray
LPC coefficients, such as those calculated by ``lpc_analysis``
per_frame_gain : ndarray
Gain calculated for each frame, such as those calculated
by ``lpc_analysis``
Returns
-------
frequencies : ndarray
Resonant frequencies calculated from LPC coefficients and gain. Returned
frequencies are from 0 to 2 * pi
magnitudes : ndarray
Magnitudes of resonant frequencies
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
n_windows, order = lp_coefficients.shape
frame_frequencies = np.zeros((n_windows, (order - 1) // 2))
frame_magnitudes = np.zeros_like(frame_frequencies)
for window in range(n_windows):
w_coefs = lp_coefficients[window]
g_coefs = per_frame_gain[window]
roots = np.roots(np.hstack(([1], w_coefs[1:])))
# Roots doesn't return the same thing as MATLAB... agh
frequencies, index = np.unique(
np.abs(np.angle(roots)), return_index=True)
# Make sure 0 doesn't show up...
gtz = np.where(frequencies > 0)[0]
frequencies = frequencies[gtz]
index = index[gtz]
magnitudes = g_coefs / (1. - np.abs(roots))
sort_index = np.argsort(frequencies)
frame_frequencies[window, :len(sort_index)] = frequencies[sort_index]
frame_magnitudes[window, :len(sort_index)] = magnitudes[sort_index]
return frame_frequencies, frame_magnitudes
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True):
"""
Contruct a sinusoidal model for the input signal.
Parameters
----------
X : ndarray
Input signal to model
input_sample_rate : int
The sample rate of the input signal
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
frequencies_hz : ndarray
Frequencies for the sinusoids, in Hz.
magnitudes : ndarray
Magnitudes of sinusoids returned in ``frequencies``
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
X = np.array(X, copy=copy)
resample_to = 8000
if input_sample_rate != resample_to:
if input_sample_rate % resample_to != 0:
raise ValueError("Input sample rate must be a multiple of 8k!")
# Should be able to use resample... ?
# resampled_count = round(len(X) * resample_to / input_sample_rate)
# X = sg.resample(X, resampled_count, window=sg.hanning(len(X)))
X = sg.decimate(X, input_sample_rate // resample_to)
step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.)
a, g, e = lpc_analysis(X, order=8, window_step=step_size,
window_size=2 * step_size)
f, m = lpc_to_frequency(a, g)
f_hz = f * resample_to / (2 * np.pi)
return f_hz, m
def slinterp(X, factor, copy=True):
"""
Slow-ish linear interpolation of a 1D numpy array. There must be some
better function to do this in numpy.
Parameters
----------
X : ndarray
1D input array to interpolate
factor : int
Integer factor to interpolate by
Return
------
X_r : ndarray
"""
sz = np.product(X.shape)
X = np.array(X, copy=copy)
X_s = np.hstack((X[1:], [0]))
X_r = np.zeros((factor, sz))
for i in range(factor):
X_r[i, :] = (factor - i) / float(factor) * X + (i / float(factor)) * X_s
return X_r.T.ravel()[:(sz - 1) * factor + 1]
def sinusoid_synthesis(frequencies_hz, magnitudes, input_sample_rate=16000,
resample_block=128):
"""
Create a time series based on input frequencies and magnitudes.
Parameters
----------
frequencies_hz : ndarray
Input signal to model
magnitudes : int
The sample rate of the input signal
input_sample_rate : int, optional (default=16000)
The sample rate parameter that the sinusoid analysis was run with
resample_block : int, optional (default=128)
Controls the step size of the sinusoidal model
Returns
-------
synthesized : ndarray
Sound vector synthesized from input arguments
References
----------
D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab",
Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/
"""
rows, cols = frequencies_hz.shape
synthesized = np.zeros((1 + ((rows - 1) * resample_block),))
for col in range(cols):
mags = slinterp(magnitudes[:, col], resample_block)
freqs = slinterp(frequencies_hz[:, col], resample_block)
cycles = np.cumsum(2 * np.pi * freqs / float(input_sample_rate))
sines = mags * np.cos(cycles)
synthesized += sines
return synthesized
def compress(X, n_components, window_size=128):
"""
Compress using the DCT
Parameters
----------
X : ndarray, shape=(n_samples,)
The input signal to compress. Should be 1-dimensional
n_components : int
The number of DCT components to keep. Setting n_components to about
.5 * window_size can give compression with fairly good reconstruction.
window_size : int
The input X is broken into windows of window_size, each of which are
then compressed with the DCT.
Returns
-------
X_compressed : ndarray, shape=(num_windows, window_size)
A 2D array of non-overlapping DCT coefficients. For use with uncompress
Reference
---------
http://nbviewer.ipython.org/github/craffel/crucialpython/blob/master/week3/stride_tricks.ipynb
"""
if len(X) % window_size != 0:
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
num_frames = len(X) // window_size
X_strided = X.reshape((num_frames, window_size))
X_dct = fftpack.dct(X_strided, norm='ortho')
if n_components is not None:
X_dct = X_dct[:, :n_components]
return X_dct
def uncompress(X_compressed, window_size=128):
"""
Uncompress a DCT compressed signal (such as returned by ``compress``).
Parameters
----------
X_compressed : ndarray, shape=(n_samples, n_features)
Windowed and compressed array.
window_size : int, optional (default=128)
Size of the window used when ``compress`` was called.
Returns
-------
X_reconstructed : ndarray, shape=(n_samples)
Reconstructed version of X.
"""
if X_compressed.shape[1] % window_size != 0:
append = np.zeros((X_compressed.shape[0],
window_size - X_compressed.shape[1] % window_size))
X_compressed = np.hstack((X_compressed, append))
X_r = fftpack.idct(X_compressed, norm='ortho')
return X_r.ravel()
def sine_window(X):
"""
Apply a sinusoid window to X.
Parameters
----------
X : ndarray, shape=(n_samples, n_features)
Input array of samples
Returns
-------
X_windowed : ndarray, shape=(n_samples, n_features)
Windowed version of X.
"""
i = np.arange(X.shape[1])
win = np.sin(np.pi * (i + 0.5) / X.shape[1])
row_stride = 0
col_stride = win.itemsize
strided_win = as_strided(win, shape=X.shape,
strides=(row_stride, col_stride))
return X * strided_win
def kaiserbessel_window(X, alpha=6.5):
"""
Apply a Kaiser-Bessel window to X.
Parameters
----------
X : ndarray, shape=(n_samples, n_features)
Input array of samples
alpha : float, optional (default=6.5)
Tuning parameter for Kaiser-Bessel function. alpha=6.5 should make
perfect reconstruction possible for DCT.
Returns
-------
X_windowed : ndarray, shape=(n_samples, n_features)
Windowed version of X.
"""
beta = np.pi * alpha
win = sg.kaiser(X.shape[1], beta)
row_stride = 0
col_stride = win.itemsize
strided_win = as_strided(win, shape=X.shape,
strides=(row_stride, col_stride))
return X * strided_win
def overlap(X, window_size, window_step):
"""
Create an overlapped version of X
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to window and overlap
window_size : int
Size of windows to take
window_step : int
Step size between windows
Returns
-------
X_strided : shape=(n_windows, window_size)
2D array of overlapped X
"""
if window_size % 2 != 0:
raise ValueError("Window size must be even!")
# Make sure there are an even number of windows before stridetricks
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
num_frames = len(X) // window_step - 1
row_stride = X.itemsize * window_step
col_stride = X.itemsize
X_strided = as_strided(X, shape=(num_frames, window_size),
strides=(row_stride, col_stride))
return X_strided
def halfoverlap(X, window_size):
"""
Create an overlapped version of X using 50% of window_size as overlap.
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to window and overlap
window_size : int
Size of windows to take
Returns
-------
X_strided : shape=(n_windows, window_size)
2D array of overlapped X
"""
if window_size % 2 != 0:
raise ValueError("Window size must be even!")
window_step = window_size // 2
# Make sure there are an even number of windows before stridetricks
append = np.zeros((window_size - len(X) % window_size))
X = np.hstack((X, append))
num_frames = len(X) // window_step - 1
row_stride = X.itemsize * window_step
col_stride = X.itemsize
X_strided = as_strided(X, shape=(num_frames, window_size),
strides=(row_stride, col_stride))
return X_strided
def invert_halfoverlap(X_strided):
"""
Invert ``halfoverlap`` function to reconstruct X
Parameters
----------
X_strided : ndarray, shape=(n_windows, window_size)
X as overlapped windows
Returns
-------
X : ndarray, shape=(n_samples,)
Reconstructed version of X
"""
# Hardcoded 50% overlap! Can generalize later...
n_rows, n_cols = X_strided.shape
X = np.zeros((((int(n_rows // 2) + 1) * n_cols),)).astype(X_strided.dtype)
start_index = 0
end_index = n_cols
window_step = n_cols // 2
for row in range(X_strided.shape[0]):
X[start_index:end_index] += X_strided[row]
start_index += window_step
end_index += window_step
return X
def csvd(arr):
"""
Do the complex SVD of a 2D array, returning real valued U, S, VT
http://stemblab.github.io/complex-svd/
"""
C_r = arr.real
C_i = arr.imag
block_x = C_r.shape[0]
block_y = C_r.shape[1]
K = np.zeros((2 * block_x, 2 * block_y))
# Upper left
K[:block_x, :block_y] = C_r
# Lower left
K[:block_x, block_y:] = C_i
# Upper right
K[block_x:, :block_y] = -C_i
# Lower right
K[block_x:, block_y:] = C_r
return svd(K, full_matrices=False)
def icsvd(U, S, VT):
"""
Invert back to complex values from the output of csvd
U, S, VT = csvd(X)
X_rec = inv_csvd(U, S, VT)
"""
K = U.dot(np.diag(S)).dot(VT)
block_x = U.shape[0] // 2
block_y = U.shape[1] // 2
arr_rec = np.zeros((block_x, block_y)) + 0j
arr_rec.real = K[:block_x, :block_y]
arr_rec.imag = K[:block_x, block_y:]
return arr_rec
def overlap_compress(X, n_components, window_size):
"""
Overlap (at 50% of window_size) and compress X.
Parameters
----------
X : ndarray, shape=(n_samples,)
Input signal to compress
n_components : int
number of DCT components to keep
window_size : int
Size of windows to take
Returns
-------
X_dct : ndarray, shape=(n_windows, n_components)
Windowed and compressed version of X
"""
X_strided = halfoverlap(X, window_size)
X_dct = fftpack.dct(X_strided, norm='ortho')
if n_components is not None:
X_dct = X_dct[:, :n_components]
return X_dct
# Evil voice is caused by adding double the zeros before inverse DCT...
# Very cool bug but makes sense
def overlap_uncompress(X_compressed, window_size):
"""
Uncompress X as returned from ``overlap_compress``.
Parameters
----------
X_compressed : ndarray, shape=(n_windows, n_components)
Windowed and compressed version of X
window_size : int
Size of windows originally used when compressing X
Returns
-------
X_reconstructed : ndarray, shape=(n_samples,)
Reconstructed version of X
"""
if X_compressed.shape[1] % window_size != 0:
append = np.zeros((X_compressed.shape[0], window_size -
X_compressed.shape[1] % window_size))
X_compressed = np.hstack((X_compressed, append))
X_r = fftpack.idct(X_compressed, norm='ortho')
return invert_halfoverlap(X_r)
def lpc_to_lsf(all_lpc):
if len(all_lpc.shape) < 2:
all_lpc = all_lpc[None]
order = all_lpc.shape[1] - 1
all_lsf = np.zeros((len(all_lpc), order))
for i in range(len(all_lpc)):
lpc = all_lpc[i]
lpc1 = np.append(lpc, 0)
lpc2 = lpc1[::-1]
sum_filt = lpc1 + lpc2
diff_filt = lpc1 - lpc2
if order % 2 != 0:
deconv_diff, _ = sg.deconvolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff, _ = sg.deconvolve(diff_filt, [1, -1])
deconv_sum, _ = sg.deconvolve(sum_filt, [1, 1])
roots_diff = np.roots(deconv_diff)
roots_sum = np.roots(deconv_sum)
angle_diff = np.angle(roots_diff[::2])
angle_sum = np.angle(roots_sum[::2])
lsf = np.sort(np.hstack((angle_diff, angle_sum)))
if len(lsf) != 0:
all_lsf[i] = lsf
return np.squeeze(all_lsf)
def lsf_to_lpc(all_lsf):
if len(all_lsf.shape) < 2:
all_lsf = all_lsf[None]
order = all_lsf.shape[1]
all_lpc = np.zeros((len(all_lsf), order + 1))
for i in range(len(all_lsf)):
lsf = all_lsf[i]
zeros = np.exp(1j * lsf)
sum_zeros = zeros[::2]
diff_zeros = zeros[1::2]
sum_zeros = np.hstack((sum_zeros, np.conj(sum_zeros)))
diff_zeros = np.hstack((diff_zeros, np.conj(diff_zeros)))
sum_filt = np.poly(sum_zeros)
diff_filt = np.poly(diff_zeros)
if order % 2 != 0:
deconv_diff = sg.convolve(diff_filt, [1, 0, -1])
deconv_sum = sum_filt
else:
deconv_diff = sg.convolve(diff_filt, [1, -1])
deconv_sum = sg.convolve(sum_filt, [1, 1])
lpc = .5 * (deconv_sum + deconv_diff)
# Last coefficient is 0 and not returned
all_lpc[i] = lpc[:-1]
return np.squeeze(all_lpc)
def herz_to_mel(freqs):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
f_0 = 0 # 133.33333
f_sp = 200 / 3. # 66.66667
bark_freq = 1000.
bark_pt = (bark_freq - f_0) / f_sp
# The magic 1.0711703 which is the ratio needed to get from 1000 Hz
# to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz
# and the preceding linear filter center at 933.33333 Hz
# (actually 1000/933.33333 = 1.07142857142857 and
# exp(log(6.4)/27) = 1.07117028749447)
if not isinstance(freqs, np.ndarray):
freqs = np.array(freqs)[None]
log_step = np.exp(np.log(6.4) / 27)
lin_pts = (freqs < bark_freq)
mel = 0. * freqs
mel[lin_pts] = (freqs[lin_pts] - f_0) / f_sp
mel[~lin_pts] = bark_pt + np.log(freqs[~lin_pts] / bark_freq) / np.log(
log_step)
return mel
def mel_to_herz(mel):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
f_0 = 0 # 133.33333
f_sp = 200 / 3. # 66.66667
bark_freq = 1000.
bark_pt = (bark_freq - f_0) / f_sp
# The magic 1.0711703 which is the ratio needed to get from 1000 Hz
# to 6400 Hz in 27 steps, and is *almost* the ratio between 1000 Hz
# and the preceding linear filter center at 933.33333 Hz
# (actually 1000/933.33333 = 1.07142857142857 and
# exp(log(6.4)/27) = 1.07117028749447)
if not isinstance(mel, np.ndarray):
mel = np.array(mel)[None]
log_step = np.exp(np.log(6.4) / 27)
lin_pts = (mel < bark_pt)
freqs = 0. * mel
freqs[lin_pts] = f_0 + f_sp * mel[lin_pts]
freqs[~lin_pts] = bark_freq * np.exp(np.log(log_step) * (
mel[~lin_pts] - bark_pt))
return freqs
def mel_freq_weights(n_fft, fs, n_filts=None, width=None):
"""
Based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
min_freq = 0
max_freq = fs // 2
if width is None:
width = 1.
if n_filts is None:
n_filts = int(herz_to_mel(max_freq) / 2) + 1
else:
n_filts = int(n_filts)
assert n_filts > 0
weights = np.zeros((n_filts, n_fft))
fft_freqs = np.arange(n_fft // 2) / n_fft * fs
min_mel = herz_to_mel(min_freq)
max_mel = herz_to_mel(max_freq)
partial = np.arange(n_filts + 2) / (n_filts + 1) * (max_mel - min_mel)
bin_freqs = mel_to_herz(min_mel + partial)
bin_bin = np.round(bin_freqs / fs * (n_fft - 1))
for i in range(n_filts):
fs_i = bin_freqs[i + np.arange(3)]
fs_i = fs_i[1] + width * (fs_i - fs_i[1])
lo_slope = (fft_freqs - fs_i[0]) / float(fs_i[1] - fs_i[0])
hi_slope = (fs_i[2] - fft_freqs) / float(fs_i[2] - fs_i[1])
weights[i, :n_fft // 2] = np.maximum(
0, np.minimum(lo_slope, hi_slope))
# Constant amplitude multiplier
weights = np.diag(2. / (bin_freqs[2:n_filts + 2]
- bin_freqs[:n_filts])).dot(weights)
weights[:, n_fft // 2:] = 0
return weights
def time_attack_agc(X, fs, t_scale=0.5, f_scale=1.):
"""
AGC based on code by Dan Ellis
http://labrosa.ee.columbia.edu/matlab/tf_agc/
"""
# 32 ms grid for FFT
n_fft = 2 ** int(np.log(0.032 * fs) / np.log(2))
f_scale = float(f_scale)
window_size = n_fft
window_step = window_size // 2
X_freq = stft(X, window_size, mean_normalize=False)
fft_fs = fs / window_step
n_bands = max(10, 20 / f_scale)
mel_width = f_scale * n_bands / 10.
f_to_a = mel_freq_weights(n_fft, fs, n_bands, mel_width)
f_to_a = f_to_a[:, :n_fft // 2]
audiogram = np.abs(X_freq).dot(f_to_a.T)
fbg = np.zeros_like(audiogram)
state = np.zeros((audiogram.shape[1],))
alpha = np.exp(-(1. / fft_fs) / t_scale)
for i in range(len(audiogram)):
state = np.maximum(alpha * state, audiogram[i])
fbg[i] = state
sf_to_a = np.sum(f_to_a, axis=0)
E = np.diag(1. / (sf_to_a + (sf_to_a == 0)))
E = E.dot(f_to_a.T)
E = fbg.dot(E.T)
E[E <= 0] = np.min(E[E > 0])
ts = istft(X_freq / E, window_size, mean_normalize=False)
return ts, X_freq, E
def hebbian_kmeans(X, n_clusters=10, n_epochs=10, W=None, learning_rate=0.01,
batch_size=100, random_state=None, verbose=True):
"""
Modified from existing code from R. Memisevic
See http://www.cs.toronto.edu/~rfm/code/hebbian_kmeans.py
"""
if W is None:
if random_state is None:
random_state = np.random.RandomState()
W = 0.1 * random_state.randn(n_clusters, X.shape[1])
else:
assert n_clusters == W.shape[0]
X2 = (X ** 2).sum(axis=1, keepdims=True)
last_print = 0
for e in range(n_epochs):
for i in range(0, X.shape[0], batch_size):
X_i = X[i: i + batch_size]
X2_i = X2[i: i + batch_size]
D = -2 * np.dot(W, X_i.T)
D += (W ** 2).sum(axis=1, keepdims=True)
D += X2_i.T
S = (D == D.min(axis=0)[None, :]).astype("float").T
W += learning_rate * (
np.dot(S.T, X_i) - S.sum(axis=0)[:, None] * W)
if verbose:
if e == 0 or e > (.05 * n_epochs + last_print):
last_print = e
print("Epoch %i of %i, cost %.4f" % (
e + 1, n_epochs, D.min(axis=0).sum()))
return W
def complex_to_real_view(arr_c):
# Inplace view from complex to r, i as separate columns
assert arr_c.dtype in [np.complex64, np.complex128]
shp = arr_c.shape
dtype = np.float64 if arr_c.dtype == np.complex128 else np.float32
arr_r = arr_c.ravel().view(dtype=dtype).reshape(shp[0], 2 * shp[1])
return arr_r
def real_to_complex_view(arr_r):
# Inplace view from real, image as columns to complex
assert arr_r.dtype not in [np.complex64, np.complex128]
shp = arr_r.shape
dtype = np.complex128 if arr_r.dtype == np.float64 else np.complex64
arr_c = arr_r.ravel().view(dtype=dtype).reshape(shp[0], shp[1] // 2)
return arr_c
def complex_to_abs(arr_c):
return np.abs(arr_c)
def complex_to_angle(arr_c):
return np.angle(arr_c)
def abs_and_angle_to_complex(arr_abs, arr_angle):
# abs(f_c2 - f_c) < 1E-15
return arr_abs * np.exp(1j * arr_angle)
def polyphase_core(x, m, f):
# x = input data
# m = decimation rate
# f = filter
# Hack job - append zeros to match decimation rate
if x.shape[0] % m != 0:
x = np.append(x, np.zeros((m - x.shape[0] % m,)))
if f.shape[0] % m != 0:
f = np.append(f, np.zeros((m - f.shape[0] % m,)))
polyphase = p = np.zeros((m, (x.shape[0] + f.shape[0]) / m), dtype=x.dtype)
p[0, :-1] = np.convolve(x[::m], f[::m])
# Invert the x values when applying filters
for i in range(1, m):
p[i, 1:] = np.convolve(x[m - i::m], f[i::m])
return p
def polyphase_single_filter(x, m, f):
return np.sum(polyphase_core(x, m, f), axis=0)
def polyphase_lowpass(arr, downsample=2, n_taps=50, filter_pad=1.1):
filt = firwin(downsample * n_taps, 1 / (downsample * filter_pad))
filtered = polyphase_single_filter(arr, downsample, filt)
return filtered
def window(arr, window_size, window_step=1, axis=0):
"""
Directly taken from Erik Rigtorp's post to numpy-discussion.
<http://www.mail-archive.com/[email protected]/msg29450.html>
<http://stackoverflow.com/questions/4936620/using-strides-for-an-efficient-moving-average-filter>
"""
if window_size < 1:
raise ValueError("`window_size` must be at least 1.")
if window_size > arr.shape[-1]:
raise ValueError("`window_size` is too long.")
orig = list(range(len(arr.shape)))
trans = list(range(len(arr.shape)))
trans[axis] = orig[-1]
trans[-1] = orig[axis]
arr = arr.transpose(trans)
shape = arr.shape[:-1] + (arr.shape[-1] - window_size + 1, window_size)
strides = arr.strides + (arr.strides[-1],)
strided = as_strided(arr, shape=shape, strides=strides)
if window_step > 1:
strided = strided[..., ::window_step, :]
orig = list(range(len(strided.shape)))
trans = list(range(len(strided.shape)))
trans[-2] = orig[-1]
trans[-1] = orig[-2]
trans = trans[::-1]
strided = strided.transpose(trans)
return strided
def unwindow(arr, window_size, window_step=1, axis=0):
# undo windows by broadcast
if axis != 0:
raise ValueError("axis != 0 currently unsupported")
shp = arr.shape
unwindowed = np.tile(arr[:, None, ...], (1, window_step, 1, 1))
unwindowed = unwindowed.reshape(shp[0] * window_step, *shp[1:])
return unwindowed.mean(axis=1)
def angle_to_sin_cos(arr_angle):
return np.hstack((np.sin(arr_angle), np.cos(arr_angle)))
def sin_cos_to_angle(arr_sin, arr_cos):
return np.arctan2(arr_sin, arr_cos)
def xcorr_offset(x1, x2):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
x1 = x1 - x1.mean()
x2 = x2 - x2.mean()
frame_size = len(x2)
half = frame_size // 2
corrs = np.convolve(x1.astype('float32'), x2[::-1].astype('float32'))
corrs[:half] = -1E30
corrs[-half:] = -1E30
offset = corrs.argmax() - len(x1)
return offset
def invert_spectrogram(X_s, step, calculate_offset=True, set_zero_phase=True):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
size = int(X_s.shape[1] // 2)
wave = np.zeros((X_s.shape[0] * step + size))
# Getting overflow warnings with 32 bit...
wave = wave.astype('float64')
total_windowing_sum = np.zeros((X_s.shape[0] * step + size))
win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1))
est_start = int(size // 2) - 1
est_end = est_start + size
for i in range(X_s.shape[0]):
wave_start = int(step * i)
wave_end = wave_start + size
if set_zero_phase:
spectral_slice = X_s[i].real + 0j
else:
# already complex
spectral_slice = X_s[i]
# Don't need fftshift due to different impl.
wave_est = np.real(np.fft.ifft(spectral_slice))[::-1]
if calculate_offset and i > 0:
offset_size = size - step
if offset_size <= 0:
print("WARNING: Large step size >50\% detected! "
"This code works best with high overlap - try "
"with 75% or greater")
offset_size = step
offset = xcorr_offset(wave[wave_start:wave_start + offset_size],
wave_est[est_start:est_start + offset_size])
else:
offset = 0
wave[wave_start:wave_end] += win * wave_est[
est_start - offset:est_end - offset]
total_windowing_sum[wave_start:wave_end] += win
wave = np.real(wave) / (total_windowing_sum + 1E-6)
return wave
def iterate_invert_spectrogram(X_s, fftsize, step, n_iter=10, verbose=False):
"""
Under MSR-LA License
Based on MATLAB implementation from Spectrogram Inversion Toolbox
References
----------
D. Griffin and J. Lim. Signal estimation from modified
short-time Fourier transform. IEEE Trans. Acoust. Speech
Signal Process., 32(2):236-243, 1984.
Malcolm Slaney, Daniel Naar and Richard F. Lyon. Auditory
Model Inversion for Sound Separation. Proc. IEEE-ICASSP,
Adelaide, 1994, II.77-80.
Xinglei Zhu, G. Beauregard, L. Wyse. Real-Time Signal
Estimation from Modified Short-Time Fourier Transform
Magnitude Spectra. IEEE Transactions on Audio Speech and
Language Processing, 08/2007.
"""
reg = np.max(X_s) / 1E8
X_best = copy.deepcopy(X_s)
for i in range(n_iter):
if verbose:
print("Runnning iter %i" % i)
if i == 0:
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=True)
else:
# Calculate offset was False in the MATLAB version
# but in mine it massively improves the result
# Possible bug in my impl?
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=False)
est = stft(X_t, fftsize=fftsize, step=step, compute_onesided=False)
phase = est / np.maximum(reg, np.abs(est))
X_best = X_s * phase[:len(X_s)]
X_t = invert_spectrogram(X_best, step, calculate_offset=True,
set_zero_phase=False)
return np.real(X_t)
def implot(arr, scale=None, title="", cmap="gray"):
import matplotlib.pyplot as plt
if scale is "specgram":
# plotting part
mag = 20. * np.log10(np.abs(arr))
# Transpose so time is X axis, and invert y axis so
# frequency is low at bottom
mag = mag.T[::-1, :]
else:
mag = arr
f, ax = plt.subplots()
ax.matshow(mag, cmap=cmap)
plt.axis("off")
x1 = mag.shape[0]
y1 = mag.shape[1]
def autoaspect(x_range, y_range):
"""
The aspect to make a plot square with ax.set_aspect in Matplotlib
"""
mx = max(x_range, y_range)
mn = min(x_range, y_range)
if x_range <= y_range:
return mx / float(mn)
else:
return mn / float(mx)
asp = autoaspect(x1, y1)
ax.set_aspect(asp)
plt.title(title)
def test_lpc_to_lsf():
# Matlab style vectors for testing
# lsf = [0.7842 1.5605 1.8776 1.8984 2.3593]
# a = [1.0000 0.6149 0.9899 0.0000 0.0031 -0.0082];
lsf = [[0.7842, 1.5605, 1.8776, 1.8984, 2.3593],
[0.7842, 1.5605, 1.8776, 1.8984, 2.3593]]
a = [[1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082],
[1.0000, 0.6149, 0.9899, 0.0000, 0.0031, -0.0082]]
a = np.array(a)
lsf = np.array(lsf)
lsf_r = lpc_to_lsf(a)
assert_almost_equal(lsf, lsf_r, decimal=4)
a_r = lsf_to_lpc(lsf)
assert_almost_equal(a, a_r, decimal=4)
lsf_r = lpc_to_lsf(a[0])
assert_almost_equal(lsf[0], lsf_r, decimal=4)
a_r = lsf_to_lpc(lsf[0])
assert_almost_equal(a[0], a_r, decimal=4)
def test_lpc_analysis_truncate():
# Test that truncate doesn't crash and actually truncates
[a, g, e] = lpc_analysis(np.random.randn(85), order=8, window_step=80,
window_size=80, emphasis=0.9, truncate=True)
assert(a.shape[0] == 1)
def test_feature_build():
samplerate, X = fetch_sample_music()
# MATLAB wavread does normalization
X = X.astype('float32') / (2 ** 15)
wsz = 256
wst = 128
a, g, e = lpc_analysis(X, order=8, window_step=wst,
window_size=wsz, emphasis=0.9,
copy=True)
v, p = voiced_unvoiced(X, window_size=wsz,
window_step=wst)
c = compress(e, n_components=64)
# First component of a is always 1
combined = np.hstack((a[:, 1:], g, c[:a.shape[0]]))
features = np.zeros((a.shape[0], 2 * combined.shape[1]))
start_indices = v * combined.shape[1]
start_indices = start_indices.astype('int32')
end_indices = (v + 1) * combined.shape[1]
end_indices = end_indices.astype('int32')
for i in range(features.shape[0]):
features[i, start_indices[i]:end_indices[i]] = combined[i]
def test_mdct_and_inverse():
fs, X = fetch_sample_music()
X_dct = mdct_slow(X)
X_r = imdct_slow(X_dct)
assert np.all(np.abs(X_r[:len(X)] - X) < 1E-3)
assert np.abs(X_r[:len(X)] - X).mean() < 1E-6
def test_all():
test_lpc_analysis_truncate()
test_feature_build()
test_lpc_to_lsf()
test_mdct_and_inverse()
def run_lpc_example():
# ae.wav is from
# http://www.linguistics.ucla.edu/people/hayes/103/Charts/VChart/ae.wav
# Partially following the formant tutorial here
# http://www.mathworks.com/help/signal/ug/formant-estimation-with-lpc-coefficients.html
samplerate, X = fetch_sample_music()
c = overlap_compress(X, 200, 400)
X_r = overlap_uncompress(c, 400)
wavfile.write('lpc_uncompress.wav', samplerate, soundsc(X_r))
print("Calculating sinusoids")
f_hz, m = sinusoid_analysis(X, input_sample_rate=16000)
Xs_sine = sinusoid_synthesis(f_hz, m)
orig_fname = 'lpc_orig.wav'
sine_fname = 'lpc_sine_synth.wav'
wavfile.write(orig_fname, samplerate, soundsc(X))
wavfile.write(sine_fname, samplerate, soundsc(Xs_sine))
lpc_order_list = [8, ]
dct_components_list = [200, ]
window_size_list = [400, ]
# Seems like a dct component size of ~2/3rds the step
# (1/3rd the window for 50% overlap) works well.
for lpc_order in lpc_order_list:
for dct_components in dct_components_list:
for window_size in window_size_list:
# 50% overlap
window_step = window_size // 2
a, g, e = lpc_analysis(X, order=lpc_order,
window_step=window_step,
window_size=window_size, emphasis=0.9,
copy=True)
print("Calculating LSF")
lsf = lpc_to_lsf(a)
# Not window_size - window_step! Need to implement overlap
print("Calculating compression")
c = compress(e, n_components=dct_components,
window_size=window_step)
co = overlap_compress(e, n_components=dct_components,
window_size=window_step)
block_excitation = uncompress(c, window_size=window_step)
overlap_excitation = overlap_uncompress(co,
window_size=window_step)
a_r = lsf_to_lpc(lsf)
f, m = lpc_to_frequency(a_r, g)
block_lpc = lpc_synthesis(a_r, g, block_excitation,
emphasis=0.9,
window_step=window_step)
overlap_lpc = lpc_synthesis(a_r, g, overlap_excitation,
emphasis=0.9,
window_step=window_step)
v, p = voiced_unvoiced(X, window_size=window_size,
window_step=window_step)
noisy_lpc = lpc_synthesis(a_r, g, voiced_frames=v,
emphasis=0.9,
window_step=window_step)
if dct_components is None:
dct_components = window_size
noisy_fname = 'lpc_noisy_synth_%iwin_%ilpc_%idct.wav' % (
window_size, lpc_order, dct_components)
block_fname = 'lpc_block_synth_%iwin_%ilpc_%idct.wav' % (
window_size, lpc_order, dct_components)
overlap_fname = 'lpc_overlap_synth_%iwin_%ilpc_%idct.wav' % (
window_size, lpc_order, dct_components)
wavfile.write(noisy_fname, samplerate, soundsc(noisy_lpc))
wavfile.write(block_fname, samplerate,
soundsc(block_lpc))
wavfile.write(overlap_fname, samplerate,
soundsc(overlap_lpc))
def run_fft_vq_example():
n_fft = 512
time_smoothing = 4
def _pre(list_of_data):
f_c = np.vstack([stft(dd, n_fft) for dd in list_of_data])
if len(f_c) % time_smoothing != 0:
newlen = len(f_c) - len(f_c) % time_smoothing
f_c = f_c[:newlen]
f_mag = complex_to_abs(f_c)
f_phs = complex_to_angle(f_c)
f_sincos = angle_to_sin_cos(f_phs)
f_r = np.hstack((f_mag, f_sincos))
f_r = f_r.reshape((len(f_r) // time_smoothing,
time_smoothing * f_r.shape[1]))
return f_r, n_fft
def preprocess_train(list_of_data, random_state):
f_r, n_fft = _pre(list_of_data)
clusters = f_r
return clusters
def apply_preprocess(list_of_data, clusters):
f_r, n_fft = _pre(list_of_data)
memberships, distances = vq(f_r, clusters)
vq_r = clusters[memberships]
vq_r = vq_r.reshape((time_smoothing * len(vq_r),
vq_r.shape[1] // time_smoothing))
f_mag = vq_r[:, :n_fft // 2]
f_sincos = vq_r[:, n_fft // 2:]
extent = f_sincos.shape[1] // 2
f_phs = sin_cos_to_angle(f_sincos[:, :extent], f_sincos[:, extent:])
vq_c = abs_and_angle_to_complex(f_mag, f_phs)
d_k = istft(vq_c, fftsize=n_fft)
return d_k
random_state = np.random.RandomState(1999)
"""
fs, d = fetch_sample_music()
sub = int(.8 * d.shape[0])
d1 = [d[:sub]]
d2 = [d[sub:]]
"""
fs, d = fetch_sample_speech_fruit()
d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8]
d2 = d[7::8]
# make sure d1 and d2 aren't the same!
assert [len(di) for di in d1] != [len(di) for di in d2]
clusters = preprocess_train(d1, random_state)
# Training data
vq_d1 = apply_preprocess(d1, clusters)
vq_d2 = apply_preprocess(d2, clusters)
assert [i != j for i, j in zip(vq_d1.ravel(), vq_d2.ravel())]
fix_d1 = np.concatenate(d1)
fix_d2 = np.concatenate(d2)
wavfile.write("fft_train_no_agc.wav", fs, soundsc(fix_d1))
wavfile.write("fft_test_no_agc.wav", fs, soundsc(fix_d2))
wavfile.write("fft_vq_train_no_agc.wav", fs, soundsc(vq_d1, fs))
wavfile.write("fft_vq_test_no_agc.wav", fs, soundsc(vq_d2, fs))
agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5)
agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5)
agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5)
agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5)
wavfile.write("fft_train_agc.wav", fs, soundsc(agc_d1))
wavfile.write("fft_test_agc.wav", fs, soundsc(agc_d2))
wavfile.write("fft_vq_train_agc.wav", fs, soundsc(agc_vq_d1, fs))
wavfile.write("fft_vq_test_agc.wav", fs, soundsc(agc_vq_d2))
def run_dct_vq_example():
def _pre(list_of_data):
# Temporal window setting is crucial! - 512 seems OK for music, 256
# fruit perhaps due to samplerates
n_dct = 512
f_r = np.vstack([mdct_slow(dd, n_dct) for dd in list_of_data])
return f_r, n_dct
def preprocess_train(list_of_data, random_state):
f_r, n_dct = _pre(list_of_data)
clusters = f_r
return clusters
def apply_preprocess(list_of_data, clusters):
f_r, n_dct = _pre(list_of_data)
f_clust = f_r
memberships, distances = vq(f_clust, clusters)
vq_r = clusters[memberships]
d_k = imdct_slow(vq_r, n_dct)
return d_k
random_state = np.random.RandomState(1999)
# This doesn't work very well due to only taking a sample from the end as
# test
fs, d = fetch_sample_music()
sub = int(.8 * d.shape[0])
d1 = [d[:sub]]
d2 = [d[sub:]]
"""
fs, d = fetch_sample_speech_fruit()
d1 = d[::8] + d[1::8] + d[2::8] + d[3::8] + d[4::8] + d[5::8] + d[6::8]
d2 = d[7::8]
# make sure d1 and d2 aren't the same!
assert [len(di) for di in d1] != [len(di) for di in d2]
"""
clusters = preprocess_train(d1, random_state)
# Training data
vq_d1 = apply_preprocess(d1, clusters)
vq_d2 = apply_preprocess(d2, clusters)
assert [i != j for i, j in zip(vq_d2.ravel(), vq_d2.ravel())]
fix_d1 = np.concatenate(d1)
fix_d2 = np.concatenate(d2)
wavfile.write("dct_train_no_agc.wav", fs, soundsc(fix_d1))
wavfile.write("dct_test_no_agc.wav", fs, soundsc(fix_d2))
wavfile.write("dct_vq_train_no_agc.wav", fs, soundsc(vq_d1))
wavfile.write("dct_vq_test_no_agc.wav", fs, soundsc(vq_d2))
"""
import matplotlib.pyplot as plt
plt.specgram(vq_d2, cmap="gray")
plt.figure()
plt.specgram(fix_d2, cmap="gray")
plt.show()
"""
agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5)
agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5)
agc_vq_d1, freq_vq_d1, energy_vq_d1 = time_attack_agc(vq_d1, fs, .5, 5)
agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5)
wavfile.write("dct_train_agc.wav", fs, soundsc(agc_d1))
wavfile.write("dct_test_agc.wav", fs, soundsc(agc_d2))
wavfile.write("dct_vq_train_agc.wav", fs, soundsc(agc_vq_d1))
wavfile.write("dct_vq_test_agc.wav", fs, soundsc(agc_vq_d2))
def run_phase_reconstruction_example():
fs, d = fetch_sample_speech_tapestry()
# actually gives however many components you say! So double what .m file
# says
fftsize = 512
step = 64
X_s = np.abs(stft(d, fftsize=fftsize, step=step, real=False,
compute_onesided=False))
X_t = iterate_invert_spectrogram(X_s, fftsize, step, verbose=True)
"""
import matplotlib.pyplot as plt
plt.specgram(d, cmap="gray")
plt.savefig("1.png")
plt.close()
plt.imshow(X_s, cmap="gray")
plt.savefig("2.png")
plt.close()
"""
wavfile.write("phase_original.wav", fs, soundsc(d))
wavfile.write("phase_reconstruction.wav", fs, soundsc(X_t))
def run_phase_vq_example():
def _pre(list_of_data):
# Temporal window setting is crucial! - 512 seems OK for music, 256
# fruit perhaps due to samplerates
n_fft = 256
step = 32
f_r = np.vstack([np.abs(stft(dd, n_fft, step=step, real=False,
compute_onesided=False))
for dd in list_of_data])
return f_r, n_fft, step
def preprocess_train(list_of_data, random_state):
f_r, n_fft, step = _pre(list_of_data)
clusters = copy.deepcopy(f_r)
return clusters
def apply_preprocess(list_of_data, clusters):
f_r, n_fft, step = _pre(list_of_data)
f_clust = f_r
# Nondeterministic ?
memberships, distances = vq(f_clust, clusters)
vq_r = clusters[memberships]
d_k = iterate_invert_spectrogram(vq_r, n_fft, step, verbose=True)
return d_k
random_state = np.random.RandomState(1999)
fs, d = fetch_sample_speech_fruit()
d1 = d[::9]
d2 = d[7::8][:5]
# make sure d1 and d2 aren't the same!
assert [len(di) for di in d1] != [len(di) for di in d2]
clusters = preprocess_train(d1, random_state)
fix_d1 = np.concatenate(d1)
fix_d2 = np.concatenate(d2)
vq_d2 = apply_preprocess(d2, clusters)
wavfile.write("phase_train_no_agc.wav", fs, soundsc(fix_d1))
wavfile.write("phase_vq_test_no_agc.wav", fs, soundsc(vq_d2))
agc_d1, freq_d1, energy_d1 = time_attack_agc(fix_d1, fs, .5, 5)
agc_d2, freq_d2, energy_d2 = time_attack_agc(fix_d2, fs, .5, 5)
agc_vq_d2, freq_vq_d2, energy_vq_d2 = time_attack_agc(vq_d2, fs, .5, 5)
"""
import matplotlib.pyplot as plt
plt.specgram(agc_vq_d2, cmap="gray")
#plt.title("Fake")
plt.figure()
plt.specgram(agc_d2, cmap="gray")
#plt.title("Real")
plt.show()
"""
wavfile.write("phase_train_agc.wav", fs, soundsc(agc_d1))
wavfile.write("phase_test_agc.wav", fs, soundsc(agc_d2))
wavfile.write("phase_vq_test_agc.wav", fs, soundsc(agc_vq_d2))
def run_cqt_example():
fs, d = fetch_sample_file("/Users/User/cqt_resources/kempff1.wav")
X = d[:44100]
X_cq, c_dc, c_nyq, multiscale, shift, window_lens = cqt(X, fs)
X_r = icqt(X_cq, c_dc, c_nyq, multiscale, shift, window_lens)
SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X))
wavfile.write("cqt_original.wav", fs, soundsc(X))
wavfile.write("cqt_reconstruction.wav", fs, soundsc(X_r))
def run_fft_dct_example():
random_state = np.random.RandomState(1999)
fs, d = fetch_sample_speech_fruit()
n_fft = 64
X = d[0]
X_stft = stft(X, n_fft)
X_rr = complex_to_real_view(X_stft)
X_dct = fftpack.dct(X_rr, axis=-1, norm='ortho')
X_dct_sub = X_dct[1:] - X_dct[:-1]
std = X_dct_sub.std(axis=0, keepdims=True)
X_dct_sub += .01 * std * random_state.randn(
X_dct_sub.shape[0], X_dct_sub.shape[1])
X_dct_unsub = np.cumsum(X_dct_sub, axis=0)
X_idct = fftpack.idct(X_dct_unsub, axis=-1, norm='ortho')
X_irr = real_to_complex_view(X_idct)
X_r = istft(X_irr, n_fft)[:len(X)]
SNR = 20 * np.log10(np.linalg.norm(X - X_r) / np.linalg.norm(X))
print(SNR)
wavfile.write("fftdct_orig.wav", fs, soundsc(X))
wavfile.write("fftdct_rec.wav", fs, soundsc(X_r))
if __name__ == "__main__":
"""
Trying to run all examples will seg fault on my laptop - probably memory!
Comment individually
"""
# run_phase_reconstruction_example()
# run_phase_vq_example()
# run_dct_vq_example()
# run_fft_vq_example()
# run_lpc_example()
# run_cqt_example()
run_fft_dct_example()
test_all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment