Skip to content

Instantly share code, notes, and snippets.

@soravux
Created May 25, 2014 01:14
Show Gist options
  • Save soravux/1ce124315bc6d1f3d430 to your computer and use it in GitHub Desktop.
Save soravux/1ce124315bc6d1f3d430 to your computer and use it in GitHub Desktop.

Revisions

  1. soravux created this gist May 25, 2014.
    115 changes: 115 additions & 0 deletions dtmf_decode.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    from scipy.io.wavfile import read, write
    from scipy.signal import find_peaks_cwt, firwin, lfilter, medfilt, resample, hamming
    from scipy.optimize import fmin
    import numpy as np
    from matplotlib import pyplot as plt

    import argparse


    class Tone:
    def __init__(self, begin_time=0.0, end_time=0.0, harmonics=None):
    # Unreadable but quick, cheap & dirty trick
    self.__dict__.update(**locals())

    def __repr__(self):
    return "{begin_time}-{end_time}: {sorted_harmonics}".format(
    sorted_harmonics=sorted(self.harmonics.items()),
    **self.__dict__
    )


    def getTones(data, rate):
    """Get list of tones recognized in the data array."""
    norm_data = data.astype('double') / ( 2 ** 15 - 1)
    x_axis = np.arange(len(norm_data)) / rate

    Pxx, ori_freqs, t, x = plt.specgram(data, NFFT=1024, noverlap=0, Fs=rate)

    tones = []
    for blk_idx in range(len(Pxx[0])):

    fft_block = np.array(list(zip(*Pxx))[blk_idx])[:60]
    fft_block, freqs = resample(fft_block, 10*len(fft_block), ori_freqs[:60], window='hamming')
    # Find peaks
    peak_idx = find_peaks_cwt(fft_block[:len(fft_block)/1.8], np.arange(3, 20))
    peak_idx = np.array(peak_idx)
    if len(peak_idx) and np.max(fft_block[peak_idx]) > 100.0:
    tone = Tone(
    blk_idx * 512 / 44100.0,
    ((blk_idx + 1) * 512 - 1) / 44100.0,
    { freqs[freq]: fft_block[freq] for freq in peak_idx },
    )
    tones.append(tone)

    return tones

    def mergeTones(tones, freqThr=0.06):
    """
    Merge tones: begin and end times adjusted and average their harmonics.
    Two tones are considered the same if their main (most powerful) frequency
    are in a freqThr% difference.
    Input list must be ordered by time.
    """
    if not tones:
    return []

    main_harm = lambda x: max(x.harmonics.items(), key=lambda y: y[1])
    main_freq = lambda x: float(main_harm(x)[0])
    retVal = [tones[0]]
    for tone in tones[1:]:
    # If frequencies in both signals are near and proeminent
    for harm_freq, harm_ampli in tone.harmonics.items():
    near_powerful = harm_ampli > 0.5 * main_harm(retVal[-1])[1]
    if not near_powerful:
    continue
    pdiff = abs(main_freq(retVal[-1]) - harm_freq) / main_freq(retVal[-1])
    same_tone = pdiff < freqThr
    if same_tone:
    retVal[-1].end_time = tone.end_time
    break
    else:
    retVal.append(tone)

    return retVal

    def decodeDTMF(tones):
    DTMF = {
    '1': (697, 1209),
    '2': (697, 1336),
    '3': (697, 1477),
    '4': (770, 1209),
    '5': (770, 1336),
    '6': (770, 1477),
    '7': (852, 1209),
    '8': (852, 1336),
    '9': (852, 1477),
    '*': (941, 1209),
    '0': (941, 1336),
    '#': (941, 1477),
    }
    retVal = []
    for tone in tones:
    # Get the two maximum frequencies of the tone
    freqs = sorted(tone.harmonics.items(), key=lambda x: -x[1])[:2]
    # Re-order by frequency
    freqs = sorted(x[0] for x in freqs)
    distance = float('inf')
    this_symbol = 'X'
    for symbol, ref_freqs in DTMF.items():
    # RSS
    this_error = (freqs[0] - ref_freqs[0])**2 + (freqs[1] - ref_freqs[1])**2
    if this_error < distance:
    distance = this_error
    this_symbol = symbol
    retVal.append(this_symbol)
    print(freqs)
    return retVal


    if __name__ == '__main__':
    rate, data = read('DTMF-562-4897.wav')
    tones = getTones(data, rate)
    out = mergeTones(tones)
    number = decodeDTMF(out)
    print(number)