Created
May 25, 2014 01:14
-
-
Save soravux/1ce124315bc6d1f3d430 to your computer and use it in GitHub Desktop.
Revisions
-
soravux created this gist
May 25, 2014 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,115 @@ from scipy.io.wavfile import read, write from scipy.signal import find_peaks_cwt, firwin, lfilter, medfilt, resample, hamming from scipy.optimize import fmin import numpy as np from matplotlib import pyplot as plt import argparse class Tone: def __init__(self, begin_time=0.0, end_time=0.0, harmonics=None): # Unreadable but quick, cheap & dirty trick self.__dict__.update(**locals()) def __repr__(self): return "{begin_time}-{end_time}: {sorted_harmonics}".format( sorted_harmonics=sorted(self.harmonics.items()), **self.__dict__ ) def getTones(data, rate): """Get list of tones recognized in the data array.""" norm_data = data.astype('double') / ( 2 ** 15 - 1) x_axis = np.arange(len(norm_data)) / rate Pxx, ori_freqs, t, x = plt.specgram(data, NFFT=1024, noverlap=0, Fs=rate) tones = [] for blk_idx in range(len(Pxx[0])): fft_block = np.array(list(zip(*Pxx))[blk_idx])[:60] fft_block, freqs = resample(fft_block, 10*len(fft_block), ori_freqs[:60], window='hamming') # Find peaks peak_idx = find_peaks_cwt(fft_block[:len(fft_block)/1.8], np.arange(3, 20)) peak_idx = np.array(peak_idx) if len(peak_idx) and np.max(fft_block[peak_idx]) > 100.0: tone = Tone( blk_idx * 512 / 44100.0, ((blk_idx + 1) * 512 - 1) / 44100.0, { freqs[freq]: fft_block[freq] for freq in peak_idx }, ) tones.append(tone) return tones def mergeTones(tones, freqThr=0.06): """ Merge tones: begin and end times adjusted and average their harmonics. Two tones are considered the same if their main (most powerful) frequency are in a freqThr% difference. Input list must be ordered by time. """ if not tones: return [] main_harm = lambda x: max(x.harmonics.items(), key=lambda y: y[1]) main_freq = lambda x: float(main_harm(x)[0]) retVal = [tones[0]] for tone in tones[1:]: # If frequencies in both signals are near and proeminent for harm_freq, harm_ampli in tone.harmonics.items(): near_powerful = harm_ampli > 0.5 * main_harm(retVal[-1])[1] if not near_powerful: continue pdiff = abs(main_freq(retVal[-1]) - harm_freq) / main_freq(retVal[-1]) same_tone = pdiff < freqThr if same_tone: retVal[-1].end_time = tone.end_time break else: retVal.append(tone) return retVal def decodeDTMF(tones): DTMF = { '1': (697, 1209), '2': (697, 1336), '3': (697, 1477), '4': (770, 1209), '5': (770, 1336), '6': (770, 1477), '7': (852, 1209), '8': (852, 1336), '9': (852, 1477), '*': (941, 1209), '0': (941, 1336), '#': (941, 1477), } retVal = [] for tone in tones: # Get the two maximum frequencies of the tone freqs = sorted(tone.harmonics.items(), key=lambda x: -x[1])[:2] # Re-order by frequency freqs = sorted(x[0] for x in freqs) distance = float('inf') this_symbol = 'X' for symbol, ref_freqs in DTMF.items(): # RSS this_error = (freqs[0] - ref_freqs[0])**2 + (freqs[1] - ref_freqs[1])**2 if this_error < distance: distance = this_error this_symbol = symbol retVal.append(this_symbol) print(freqs) return retVal if __name__ == '__main__': rate, data = read('DTMF-562-4897.wav') tones = getTones(data, rate) out = mergeTones(tones) number = decodeDTMF(out) print(number)