Skip to content

Instantly share code, notes, and snippets.

@sixtenbe
Forked from endolith/peakdet.m
Last active October 22, 2025 04:06
Show Gist options
  • Select an option

  • Save sixtenbe/1178136 to your computer and use it in GitHub Desktop.

Select an option

Save sixtenbe/1178136 to your computer and use it in GitHub Desktop.

Revisions

  1. sixtenbe revised this gist May 11, 2016. 3 changed files with 242 additions and 62 deletions.
    57 changes: 27 additions & 30 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -83,8 +83,8 @@ def _peakdetect_parabola_fitter(raw_peaks, x_axis, y_axis, points):
    Performs the actual parabola fitting for the peakdetect_parabola function.
    keyword arguments:
    raw_peaks -- A list of either the maximum or the minimum peaks, as given
    by the peakdetect_zero_crossing function, with index used as x-axis
    raw_peaks -- A list of either the maxima or the minima peaks, as given
    by the peakdetect functions, with index used as x-axis
    x_axis -- A numpy array of all the x values
    @@ -304,12 +304,14 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 20):
    zero_indices = zero_crossings(y_axis, window_len = 11)
    #select a n amount of periods
    last_indice = - 1 - (1 - len(zero_indices) & 1)
    ###
    # Calculate the fft between the first and last zero crossing
    # this method could be ignored if the beginning and the end of the signal
    # are unnecessary as any errors induced from not using whole periods
    # should mainly manifest in the beginning and the end of the signal, but
    # not in the rest of the signal
    # this is also unnecessary if the given data is a amount of whole periods
    # this is also unnecessary if the given data is an amount of whole periods
    ###
    fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]])
    padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:]
    n = lambda x: int(log(x)/log(2)) + 1
    @@ -335,20 +337,6 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 20):
    data_len += 1 - data_len & 1


    fitted_wave = []
    for peaks in [max_peaks, min_peaks]:
    peak_fit_tmp = []
    index = 0
    for peak in peaks:
    index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index
    x_fit_lim = x_axis_ifft[index - data_len // 2:
    index + data_len // 2 + 1]
    y_fit_lim = y_axis_ifft[index - data_len // 2:
    index + data_len // 2 + 1]

    peak_fit_tmp.append([x_fit_lim, y_fit_lim])
    fitted_wave.append(peak_fit_tmp)

    return [max_peaks, min_peaks]


    @@ -732,7 +720,8 @@ def _smooth(x, window_len=11, window="hanning"):
    return y


    def zero_crossings(y_axis, window_len = 11, window_f="hanning"):
    def zero_crossings(y_axis, window_len = 11,
    window_f="hanning", offset_corrected=False):
    """
    Algorithm to find zero crossings. Smooths the curve and finds the
    zero-crossings by looking for a sign change.
    @@ -747,31 +736,39 @@ def zero_crossings(y_axis, window_len = 11, window_f="hanning"):
    window_f -- the type of window from 'flat', 'hanning', 'hamming',
    'bartlett', 'blackman' (default: 'hanning')
    offset_corrected -- Used for recursive calling to remove offset when needed
    return: the index for each zero-crossing
    """
    # smooth the curve
    length = len(y_axis)
    x_axis = np.asarray(range(length), int)

    # discard tail of smoothed signal
    y_axis = _smooth(y_axis, window_len, window_f)[:length]
    zero_crossings = np.where(np.diff(np.sign(y_axis)))[0]
    indices = [x_axis[index] for index in zero_crossings]
    indices = np.where(np.diff(np.sign(y_axis)))[0]

    # check if zero-crossings are valid
    diff = np.diff(indices)
    if diff.std() / diff.mean() > 0.2:
    print diff.std() / diff.mean()
    print np.diff(indices)
    if diff.std() / diff.mean() > 0.1:
    #Possibly bad zero crossing, see if it's offsets
    if ((diff[::2].std() / diff[::2].mean()) < 0.1 and
    (diff[1::2].std() / diff[1::2].mean()) < 0.1 and
    not offset_corrected):
    #offset present attempt to correct by subtracting the average
    offset = np.mean([y_axis.max(), y_axis.min()])
    return zero_crossings(y_axis-offset, window_len, window_f, True)
    #Invalid zero crossings and the offset has been removed
    print(diff.std() / diff.mean())
    print(np.diff(indices))
    raise ValueError(
    "False zero-crossings found, indicates problem {0!s} or {1!s}".format(
    "with smoothing window", "problem with offset"))
    "with smoothing window", "unhandled problem with offset"))
    # check if any zero crossings were found
    if len(zero_crossings) < 1:
    if len(indices) < 1:
    raise ValueError("No zero crossings found")

    return indices
    #remove offset from indices due to filter function when returning
    return indices - (window_len // 2 - 1)
    # used this to test the fft function's sensitivity to spectral leakage
    #return indices + np.asarray(30 * np.random.randn(len(indices)), int)

    @@ -780,8 +777,8 @@ def zero_crossings(y_axis, window_len = 11, window_f="hanning"):
    # time_p_period = diff.mean()
    #
    # if diff.std() / time_p_period > 0.1:
    # raise ValueError,
    # "smoothing window too small, false zero-crossing found"
    # raise ValueError(
    # "smoothing window too small, false zero-crossing found")
    #
    # #return frequency
    # return 1.0 / time_p_period
    24 changes: 16 additions & 8 deletions readme.txt
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,8 @@
    Musings about the peakdetect function by Sixten Bergman
    Musings about the peakdetect functions by Sixten Bergman
    Note that this code should work with both python 2.7 and python3.x.


    All the peak detection function in __all__ of peakdetect.py will work on
    All the peak detection functions in __all__ of peakdetect.py will work on
    consistent waveforms, but only peakdetect.peakdetect can properly handle
    offsets.

    @@ -22,24 +23,31 @@ this as far as I can tell is that the scipy.optimize.curve_fit can't optimize
    the variables.


    For parabola fit to function well it must be fitted to a small section of the
    For parabola fit to function well, it must be fitted to a small section of the
    peak as the curvature will start to mismatch with the function, but this also
    means that parabola should be quite sensitive to noise
    means that the parabola should be quite sensitive to noise


    FFT interpolation has between 0 to 2 orders of magnitude improvement over a
    raw peak fit. To obtain this improvement the wave needs to be heavily padded
    in length

    Spline seems to have similar performance to a FFT interpolation of the time domain.
    Spline does however seem to be better at estimating amplitude than the FFT method,
    but is unknown if this will hold true for wave-shapes that are noisy.
    Spline seems to have similar performance to a FFT interpolation of the time
    domain. Spline does however seem to be better at estimating amplitude than the
    FFT method, but is unknown if this will hold true for wave-shapes that are
    noisy.

    It should also be noted that the errors as given in "Missmatch data.txt"
    generated by the test routine are for pure functions with no noise, so the only
    error being reduced by the "non-raw" peakdetect functions are errors stemming
    low time resolution and are in no way an indication of how the functions can
    handle any kind of noise that real signals will have.


    Automatic tests for sine fitted peak detection is disabled due to it's problems



    Avoid using the following function as they're questionable in performance:
    Avoid using the following functions as they're questionable in performance:
    peakdetect_sine
    peakdetect_sine_locked
    223 changes: 199 additions & 24 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -21,14 +21,22 @@
    linspace_standard = np.linspace(0, 0.10, 1000)
    linspace_peakdetect = np.linspace(0, 0.10, 10000)

    def prng():
    """
    A numpy random number generator with a known starting state
    return: a random number generator
    """
    return np.random.RandomState(773889874)


    def _write_log(file, header, message):
    with open(file, 'ab') as f:
    with open(file, "ab") as f:
    f.write(header)
    f.write('\n')
    f.write("\n")
    f.writelines(message)
    f.write('\n')
    f.write('\n')
    f.write("\n")
    f.write("\n")


    def _calculate_missmatch(received, expected):
    @@ -59,7 +67,17 @@ def _log_diff(t_max, y_max,
    file, name
    ):
    """
    keyword arguments:
    t_max -- time of maxima
    y_max -- amplitude of maxima
    t_min -- time of minima
    y_min -- amplitude of maxima
    t_max_expected -- expected time of maxima
    y_max_expected -- expected amplitude of maxima
    t_min_expected -- expected time of minima
    y_min_expected -- expected amplitude of maxima
    file -- log file to write to
    name -- name of the test performed
    """
    t_diff_h, a_diff_h = _calculate_missmatch([t_max, y_max],
    [t_max_expected, y_max_expected])
    @@ -68,14 +86,18 @@ def _log_diff(t_max, y_max,
    t_diff_l, a_diff_l = _calculate_missmatch([t_min, y_min],
    [t_min_expected, y_min_expected])

    #data = ['\t{0:.2e}\t{1:.2e}\t{2:.2e}\t{3:.2e}'.format(*d) for d in
    #data = ["\t{0:.2e}\t{1:.2e}\t{2:.2e}\t{3:.2e}".format(*d) for d in
    # [t_diff_h, t_diff_l, a_diff_h, a_diff_l]
    # ]
    data = ['\t{0}'.format('\t'.join(map('{0:.2e}'.format, d))) for d in
    [t_diff_h, t_diff_l, a_diff_h, a_diff_l]
    frt = "val:{0} error:{1:.2e}"
    data = ["\t{0}".format("\t".join(map(frt.format, val, err))) for val, err in
    [(t_max, t_diff_h),
    (t_min, t_diff_l),
    (y_max, a_diff_h),
    (y_min, a_diff_l)]
    ]

    _write_log(file, name, '\n'.join(data))
    _write_log(file, name, "\n".join(data))



    @@ -93,11 +115,17 @@ def _is_close(max_p, min_p,
    expected_min -- expected location and value of minima
    atol_time -- absolute tolerance of location of vertex
    tol_ampl -- relative tolerance of value of vertex
    file -- log file to write to
    name -- name of the test performed
    """
    if len(max_p) == 5:
    t_max_expected, y_max_expected = zip(*expected_max)
    else:
    t_max_expected, y_max_expected = zip(*expected_max[1:])
    if abs(max_p[0][0] - expected_max[0][0]) > 0.001:
    t_max_expected, y_max_expected = zip(*expected_max[1:])
    else:
    t_max_expected, y_max_expected = zip(*expected_max[:-1])

    if len(min_p) == 5:
    t_min_expected, y_min_expected = zip(*expected_min)
    else:
    @@ -243,11 +271,11 @@ def test_peak_ACV1(self):
    peak_pos = 1000*np.sqrt(2) #1414.2135623730951
    peak_neg = -peak_pos
    expected_max = [
    [0.005, peak_pos],
    [0.025, peak_pos],
    [0.045, peak_pos],
    [0.065, peak_pos],
    [0.085, peak_pos]
    (0.005, peak_pos),
    (0.025, peak_pos),
    (0.045, peak_pos),
    (0.065, peak_pos),
    (0.085, peak_pos)
    ]
    expected_min = [
    (0.015, peak_neg),
    @@ -265,15 +293,15 @@ def test_peak_ACV1(self):
    atol_time, tol_ampl
    )

    def _test_peak_ACV2(self):
    def test_peak_ACV2(self):
    peak_pos = 1000*np.sqrt(2) + 500 #1414.2135623730951 + 500
    peak_neg = (-1000*np.sqrt(2)) + 500 #-914.2135623730951
    expected_max = [
    [0.005, peak_pos],
    [0.025, peak_pos],
    [0.045, peak_pos],
    [0.065, peak_pos],
    [0.085, peak_pos]
    (0.005, peak_pos),
    (0.025, peak_pos),
    (0.045, peak_pos),
    (0.065, peak_pos),
    (0.085, peak_pos)
    ]
    expected_min = [
    (0.015, peak_neg),
    @@ -283,7 +311,7 @@ def _test_peak_ACV2(self):
    (0.095, peak_neg)
    ]
    atol_time = 1e-5
    tol_ampl = 1e-6
    tol_ampl = 2e-6

    self._test_peak_template(analytic_wfm.ACV_A2,
    expected_max, expected_min,
    @@ -294,6 +322,8 @@ def _test_peak_ACV2(self):

    def test_peak_ACV3(self):
    """
    Sine wave with a 3rd overtone
    WolframAlpha solution
    max{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~
    @@ -374,6 +404,143 @@ def peak_neg(n):
    atol_time, tol_ampl
    )

    def test_peak_ACV4(self):
    """
    Sine wave with a 4th overtone
    Expected data is from a numerical solution using 1e8 samples
    The numerical solution used about 2 GB memory and required 64-bit
    python
    Test is currently disabled as it pushes time index forward enough to
    change what peaks are discovers by peakdetect_fft, such that the last
    maxima is lost instead of the first one, which is expected from all the
    other functions
    """
    expected_max = [
    (0.0059351920593519207, 1409.2119572886963),
    (0.025935191259351911, 1409.2119572887088),
    (0.045935191459351918, 1409.2119572887223),
    (0.065935191659351911, 1409.2119572887243),
    (0.085935191859351917, 1409.2119572887166)
    ]
    expected_min = [
    (0.015935191159351911, -1409.2119572886984),
    (0.035935191359351915, -1409.2119572887166),
    (0.055935191559351914, -1409.2119572887245),
    (0.075935191759351914, -1409.2119572887223),
    (0.09593519195935192, -1409.2119572887068)
    ]
    atol_time = 1e-5
    tol_ampl = 2.5e-6
    #reduced tolerance since the expected values are only approximated

    self._test_peak_template(analytic_wfm.ACV_A4,
    expected_max, expected_min,
    "ACV4",
    atol_time, tol_ampl
    )


    def test_peak_ACV5(self):
    """
    Realistic triangle wave
    Easy enough to solve, but here is the numerical solution from 1e8
    samples. Numerical solution used about 2 GB memory and required
    64-bit python
    expected_max = [
    [0.0050000000500000008, 1598.0613254815967]
    [0.025000000250000001, 1598.0613254815778],
    [0.045000000450000008, 1598.0613254815346],
    [0.064999999650000001, 1598.0613254815594],
    [0.084999999849999994, 1598.0613254815908]
    ]
    expected_min = [
    [0.015000000150000001, -1598.0613254815908],
    [0.035000000350000005, -1598.0613254815594],
    [0.054999999549999998, -1598.0613254815346],
    [0.074999999750000004, -1598.0613254815778],
    [0.094999999949999997, -1598.0613254815967]
    ]
    """
    peak_pos = 1130*np.sqrt(2) #1598.0613254815976
    peak_neg = -1130*np.sqrt(2) #-1598.0613254815967
    expected_max = [
    (0.005, peak_pos),
    (0.025, peak_pos),
    (0.045, peak_pos),
    (0.065, peak_pos),
    (0.085, peak_pos)
    ]
    expected_min = [
    (0.015, peak_neg),
    (0.035, peak_neg),
    (0.055, peak_neg),
    (0.075, peak_neg),
    (0.095, peak_neg)
    ]
    atol_time = 1e-5
    tol_ampl = 4e-6

    self._test_peak_template(analytic_wfm.ACV_A5,
    expected_max, expected_min,
    "ACV5",
    atol_time, tol_ampl
    )


    def test_peak_ACV6(self):
    """
    Realistic triangle wave
    Easy enough to solve, but here is the numerical solution from 1e8
    samples. Numerical solution used about 2 GB memory and required
    64-bit python
    expected_max = [
    [0.0050000000500000008, 1485.6313472729362],
    [0.025000000250000001, 1485.6313472729255],
    [0.045000000450000008, 1485.6313472729012],
    [0.064999999650000001, 1485.6313472729153],
    [0.084999999849999994, 1485.6313472729323]
    ]
    expected_min = [
    [0.015000000150000001, -1485.6313472729323],
    [0.035000000350000005, -1485.6313472729153],
    [0.054999999549999998, -1485.6313472729012],
    [0.074999999750000004, -1485.6313472729255],
    [0.094999999949999997, -1485.6313472729362]
    ]
    """
    peak_pos = 1050.5*np.sqrt(2) #1485.6313472729364
    peak_neg = -1050.5*np.sqrt(2) #1485.6313472729255
    expected_max = [
    (0.005, peak_pos),
    (0.025, peak_pos),
    (0.045, peak_pos),
    (0.065, peak_pos),
    (0.085, peak_pos)
    ]
    expected_min = [
    (0.015, peak_neg),
    (0.035, peak_neg),
    (0.055, peak_neg),
    (0.075, peak_neg),
    (0.095, peak_neg)
    ]
    atol_time = 1e-5
    tol_ampl = 2.5e-6

    self._test_peak_template(analytic_wfm.ACV_A6,
    expected_max, expected_min,
    "ACV6",
    atol_time, tol_ampl
    )





    class Test_peakdetect(_Test_peakdetect_template):
    @@ -440,6 +607,14 @@ def test__pad(self):
    def test__n(self):
    self.assertEqual(2**peakdetect._n(1000), 1024)

    def test_zero_crossings(self):
    y = analytic_wfm.ACV_A1(linspace_peakdetect)
    expected_indice = [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000]
    indice = peakdetect.zero_crossings(y, 50)
    msg = "index:{0:d} should be within 1 of expected:{1:d}"
    for rec, exp in zip(indice, expected_indice):
    self.assertAlmostEqual(rec, exp, delta=1, msg=msg.format(rec, exp))




    @@ -449,9 +624,9 @@ def test__n(self):



    if __name__ == '__main__':
    if __name__ == "__main__":
    tests_to_run = [
    Test_analytic_wfm,
    #Test_analytic_wfm,
    Test_peakdetect,
    Test_peakdetect_parabola,
    Test_peakdetect_fft,
  2. sixtenbe revised this gist Apr 23, 2016. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions analytic_wfm.py
    Original file line number Diff line number Diff line change
    @@ -61,7 +61,7 @@ def ACV_A2(T, Hz=50):

    def ACV_A3(T, Hz=50):
    """
    Generate a
    Generate a fundamental with a 3rd overtone
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    @@ -76,7 +76,7 @@ def ACV_A3(T, Hz=50):

    def ACV_A4(T, Hz=50):
    """
    Generate a
    Generate a fundamental with a 4th overtone
    keyword arguments:
    T -- time points to generate the waveform given in seconds
  3. sixtenbe revised this gist Apr 23, 2016. 4 changed files with 1142 additions and 182 deletions.
    203 changes: 203 additions & 0 deletions analytic_wfm.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,203 @@
    #!/usr/bin/python2


    # Copyright (C) 2016 Sixten Bergman
    # License WTFPL
    #
    # This program is free software. It comes without any warranty, to the extent
    # permitted by applicable law.
    # You can redistribute it and/or modify it under the terms of the Do What The
    # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See
    # http://www.wtfpl.net/ for more details.
    #

    import numpy as np
    from math import pi, sqrt

    __all__ = [
    'ACV_A1',
    'ACV_A2',
    'ACV_A3',
    'ACV_A4',
    'ACV_A5',
    'ACV_A6',
    'ACV_A7',
    'ACV_A8'
    ]



    #Heavyside step function
    H_num = lambda t: 1 if t > 0 else 0
    H = lambda T: np.asarray([1 if t > 0 else 0 for t in T])

    # pure sine
    def ACV_A1(T, Hz=50):
    """
    Generate a pure sine wave at a specified frequency
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    T = np.asarray(T, dtype=np.float64)
    return ampl * sqrt(2) * np.sin(2*pi*Hz * T)


    def ACV_A2(T, Hz=50):
    """
    Generate a pure sine wave with a DC offset at a specified frequency
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    offset = 500
    T = np.asarray(T, dtype=np.float64)
    return ampl * sqrt(2) * np.sin(2*pi*Hz * T) + offset


    def ACV_A3(T, Hz=50):
    """
    Generate a
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    T = np.asarray(T, dtype=np.float64)
    main_wave = np.sin(2*pi*Hz * T)
    harmonic_wave = 0.05 * np.sin(2*pi*Hz * T * 4 + pi * 2 / 3)
    return ampl * sqrt(2) * (main_wave + harmonic_wave)


    def ACV_A4(T, Hz=50):
    """
    Generate a
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    T = np.asarray(T, dtype=np.float64)
    main_wave = np.sin(2*pi*Hz * T)
    harmonic_wave = 0.07 * np.sin(2*pi*Hz * T * 5 + pi * 22 / 18)
    return ampl * sqrt(2) * (main_wave + harmonic_wave)


    def ACV_A5(T, Hz=50):
    """
    Generate a realistic triangle wave
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    T = np.asarray(T, dtype=np.float64)
    wave_1 = np.sin(2*pi*Hz * T)
    wave_2 = 0.05 * np.sin(2*pi*Hz * T * 3 - pi)
    wave_3 = 0.05 * np.sin(2*pi*Hz * T * 5)
    wave_4 = 0.02 * np.sin(2*pi*Hz * T * 7 - pi)
    wave_5 = 0.01 * np.sin(2*pi*Hz * T * 9)
    return ampl * sqrt(2) * (wave_1 + wave_2 + wave_3 + wave_4 + wave_5)


    def ACV_A6(T, Hz=50):
    """
    Generate a realistic triangle wave
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    T = np.asarray(T, dtype=np.float64)
    wave_1 = np.sin(2*pi*Hz * T)
    wave_2 = 0.02 * np.sin(2*pi*Hz * T * 3 - pi)
    wave_3 = 0.02 * np.sin(2*pi*Hz * T * 5)
    wave_4 = 0.0015 * np.sin(2*pi*Hz * T * 7 - pi)
    wave_5 = 0.009 * np.sin(2*pi*Hz * T * 9)
    return ampl * sqrt(2) * (wave_1 + wave_2 + wave_3 + wave_4 + wave_5)


    def ACV_A7(T, Hz=50):
    """
    Generate a growing sine wave, where the wave starts at 0 and reaches 0.9 of
    full amplitude at 250 cycles. Thereafter it will linearly increase to full
    amplitude at 500 cycles and terminate to 0
    Frequency locked to 50Hz and = 0 at t>10
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    Hz = 50
    T = np.asarray(T, dtype=np.float64)
    wave_main = np.sin(2*pi*Hz * T)
    step_func = (0.9 * T / 5 * H(5-T) + H(T-5) * H(10-T) * (0.9 + 0.1 * (T-5) / 5))
    return ampl * sqrt(2) * wave_main * step_func

    def ACV_A8(T, Hz=50):
    """
    Generate a growing sine wave, which reaches 100 times the amplitude at
    500 cycles
    frequency not implemented and signal = 0 at t>1000*pi
    signal frequency = 0.15915494309189535 Hz?
    keyword arguments:
    T -- time points to generate the waveform given in seconds
    Hz -- The desired frequency of the signal (default:50)
    """
    ampl = 1000
    Hz = 50
    T = np.asarray(T, dtype=np.float64)
    wave_main = np.sin(T)
    step_func = T / (10 * pi) * H(10 - T / (2*pi*Hz))
    return ampl * sqrt(2) * wave_main * step_func


    _ACV_A1_L = lambda T, Hz = 50: 1000 * sqrt(2) * np.sin(2*pi*Hz * T)
    #
    _ACV_A2_L = lambda T, Hz = 50: 1000 * sqrt(2) * np.sin(2*pi*Hz * T) + 500
    #
    _ACV_A3_L = lambda T, Hz = 50: 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) +
    0.05 * np.sin(2*pi*Hz * T * 4 + pi * 2 / 3))
    #
    _ACV_A4_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) +
    0.07 * np.sin(2*pi*Hz * T * 5 + pi * 22 / 18)))

    # Realistic triangle
    _ACV_A5_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) +
    0.05 * np.sin(2*pi*Hz * T * 3 - pi) +
    0.05 * np.sin(2*pi*Hz * T * 5) +
    0.02 * np.sin(2*pi*Hz * T * 7 - pi) +
    0.01 * np.sin(2*pi*Hz * T * 9)))
    #
    _ACV_A6_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) +
    0.02 * np.sin(2*pi*Hz * T * 3 - pi) +
    0.02 * np.sin(2*pi*Hz * T * 5) +
    0.0015 * np.sin(2*pi*Hz * T * 7 - pi) +
    0.009 * np.sin(2*pi*Hz * T * 9)))

    #A7 & A8 convert so that a input of 16*pi corresponds to a input 0.25 in the current version
    _ACV_A7_OLD = lambda T: [1000 * sqrt(2) * np.sin(100 * pi * t) *
    (0.9 * t / 5 * H_num(5-t) + H_num(t-5) * H_num(10-t) * (0.9 + 0.1 * (t-5) / 5)) for t in T]
    _ACV_A8_OLD = lambda T: [1000 * sqrt(2) * np.sin(t) *
    t / (10 * pi) * H_num(10 - t / (100 * pi)) for t in T]



    if __name__ == "__main__":
    #create 1 period triangle
    x = np.linspace(0, 0.02, 4000)
    y = ACV_A5(x)


    609 changes: 427 additions & 182 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -1,60 +1,122 @@
    import numpy as np
    #!/usr/bin/python2


    # Copyright (C) 2016 Sixten Bergman
    # License WTFPL
    #
    # This program is free software. It comes without any warranty, to the extent
    # permitted by applicable law.
    # You can redistribute it and/or modify it under the terms of the Do What The
    # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See
    # http://www.wtfpl.net/ for more details.
    #
    # note that the function peakdetect is derived from code which was released to
    # public domain see: http://billauer.co.il/peakdet.html
    #

    import logging
    from math import pi, log
    import numpy as np
    import pylab
    from scipy import fft, ifft
    from scipy.optimize import curve_fit
    from scipy.signal import cspline1d_eval, cspline1d

    __all__ = [
    "peakdetect",
    "peakdetect_fft",
    "peakdetect_parabola",
    "peakdetect_sine",
    "peakdetect_sine_locked",
    "peakdetect_spline",
    "peakdetect_zero_crossing",
    "zero_crossings",
    "zero_crossings_sine_fit"
    ]

    i = 10000
    x = np.linspace(0, 3.5 * pi, i)
    y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 *
    np.random.randn(i))


    def _datacheck_peakdetect(x_axis, y_axis):
    if x_axis is None:
    x_axis = range(len(y_axis))

    if len(y_axis) != len(x_axis):
    raise (ValueError,
    'Input vectors y_axis and x_axis must have same length')
    raise ValueError(
    "Input vectors y_axis and x_axis must have same length")

    #needs to be a numpy array
    y_axis = np.array(y_axis)
    x_axis = np.array(x_axis)
    return x_axis, y_axis

    def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points):

    def _pad(fft_data, pad_len):
    """
    Pads fft data to interpolate in time domain
    keyword arguments:
    fft_data -- the fft
    pad_len -- By how many times the time resolution should be increased by
    return: padded list
    """
    l = len(fft_data)
    n = _n(l * pad_len)
    fft_data = list(fft_data)

    return fft_data[:l // 2] + [0] * (2**n-l) + fft_data[l // 2:]

    def _n(x):
    """
    Find the smallest value for n, which fulfils 2**n >= x
    keyword arguments:
    x -- the value, which 2**n must surpass
    return: the integer n
    """
    Performs the actual parabole fitting for the peakdetect_parabole function.
    return int(log(x)/log(2)) + 1


    def _peakdetect_parabola_fitter(raw_peaks, x_axis, y_axis, points):
    """
    Performs the actual parabola fitting for the peakdetect_parabola function.
    keyword arguments:
    raw_peaks -- A list of either the maximium or the minimum peaks, as given
    raw_peaks -- A list of either the maximum or the minimum peaks, as given
    by the peakdetect_zero_crossing function, with index used as x-axis
    x_axis -- A numpy list of all the x values
    y_axis -- A numpy list of all the y values
    x_axis -- A numpy array of all the x values
    y_axis -- A numpy array of all the y values
    points -- How many points around the peak should be used during curve
    fitting, must be odd.
    return -- A list giving all the peaks and the fitted waveform, format:
    return: A list giving all the peaks and the fitted waveform, format:
    [[x, y, [fitted_x, fitted_y]]]
    """
    func = lambda x, k, tau, m: k * ((x - tau) ** 2) + m
    func = lambda x, a, tau, c: a * ((x - tau) ** 2) + c
    fitted_peaks = []
    distance = abs(x_axis[raw_peaks[1][0]] - x_axis[raw_peaks[0][0]]) / 4
    for peak in raw_peaks:
    index = peak[0]
    x_data = x_axis[index - points // 2: index + points // 2 + 1]
    y_data = y_axis[index - points // 2: index + points // 2 + 1]
    # get a first approximation of tau (peak position in time)
    tau = x_axis[index]
    # get a first approximation of peak amplitude
    m = peak[1]
    c = peak[1]
    a = np.sign(c) * (-1) * (np.sqrt(abs(c))/distance)**2
    """Derived from ABC formula to result in a solution where A=(rot(c)/t)**2"""

    # build list of approximations
    # k = -m as first approximation?
    p0 = (-m, tau, m)

    p0 = (a, tau, c)
    popt, pcov = curve_fit(func, x_data, y_data, p0)
    # retrieve tau and m i.e x and y value of peak
    # retrieve tau and c i.e x and y value of peak
    x, y = popt[1:3]

    # create a high resolution data set for the fitted waveform
    @@ -66,37 +128,53 @@ def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points):
    return fitted_peaks


    def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    def peakdetect_parabole(*args, **kwargs):
    """
    Misspelling of peakdetect_parabola
    function is deprecated please use peakdetect_parabola
    """
    logging.warn("peakdetect_parabole is deprecated due to misspelling use: peakdetect_parabola")

    return peakdetect_parabola(*args, **kwargs)


    def peakdetect(y_axis, x_axis = None, lookahead = 200, delta=0):
    """
    Converted from/based on a MATLAB script at:
    http://billauer.co.il/peakdet.html
    function for detecting local maximas and minmias in a signal.
    function for detecting local maxima and minima in a signal.
    Discovers peaks by searching for values which are surrounded by lower
    or larger values for maximas and minimas respectively
    or larger values for maxima and minima respectively
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- (optional) A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the postion of the peaks. If
    omitted an index of the y_axis is used. (default: None)
    lookahead -- (optional) distance to look ahead from a peak candidate to
    determine if it is the actual peak (default: 200)
    '(sample / period) / f' where '4 >= f >= 1.25' might be a good value
    delta -- (optional) this specifies a minimum difference between a peak and
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the position of the peaks. If omitted an
    index of the y_axis is used.
    (default: None)
    lookahead -- distance to look ahead from a peak candidate to determine if
    it is the actual peak
    (default: 200)
    '(samples / period) / f' where '4 >= f >= 1.25' might be a good value
    delta -- this specifies a minimum difference between a peak and
    the following points, before a peak may be considered a peak. Useful
    to hinder the function from picking up false peaks towards to end of
    the signal. To work well delta should be set to delta >= RMSnoise * 5.
    (default: 0)
    delta function causes a 20% decrease in speed, when omitted
    Correctly used it can double the speed of the function
    When omitted delta function causes a 20% decrease in speed.
    When used Correctly it can double the speed of the function
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    x, y = zip(*max_peaks)
    """
    max_peaks = []
    min_peaks = []
    @@ -110,9 +188,9 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):

    #perform some checks
    if lookahead < 1:
    raise ValueError, "Lookahead must be '1' or above in value"
    raise ValueError("Lookahead must be '1' or above in value")
    if not (np.isscalar(delta) and delta >= 0):
    raise ValueError, "delta must be a positive number"
    raise ValueError("delta must be a positive number")

    #maxima and minima candidates are temporarily stored in
    #mx and mn respectively
    @@ -178,7 +256,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    return [max_peaks, min_peaks]


    def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    def peakdetect_fft(y_axis, x_axis, pad_len = 20):
    """
    Performs a FFT calculation on the data and zero-pads the results to
    increase the time domain resolution after performing the inverse fft and
    @@ -196,41 +274,46 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    The biggest time eater in this function is the ifft and thereafter it's
    the 'peakdetect' function which takes only half the time of the ifft.
    Speed improvementd could include to check if 2**n points could be used for
    Speed improvements could include to check if 2**n points could be used for
    fft and ifft or change the 'peakdetect' to the 'peakdetect_zero_crossing',
    which is maybe 10 times faster than 'peakdetct'. The pro of 'peakdetect'
    is that it resutls in one less lost peak. It should also be noted that the
    is that it results in one less lost peak. It should also be noted that the
    time used by the ifft function can change greatly depending on the input.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    pad_len -- (optional) By how many times the time resolution should be
    in the return to specify the position of the peaks.
    pad_len -- By how many times the time resolution should be
    increased by, e.g. 1 doubles the resolution. The amount is rounded up
    to the nearest 2 ** n amount (default: 5)
    to the nearest 2**n amount
    (default: 20)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    x, y = zip(*max_peaks)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    zero_indices = zero_crossings(y_axis, window = 11)
    zero_indices = zero_crossings(y_axis, window_len = 11)
    #select a n amount of periods
    last_indice = - 1 - (1 - len(zero_indices) & 1)
    # Calculate the fft between the first and last zero crossing
    # this method could be ignored if the begining and the end of the signal
    # are discardable as any errors induced from not using whole periods
    # this method could be ignored if the beginning and the end of the signal
    # are unnecessary as any errors induced from not using whole periods
    # should mainly manifest in the beginning and the end of the signal, but
    # not in the rest of the signal
    # this is also unnecessary if the given data is a amount of whole periods
    fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]])
    padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:]
    n = lambda x: int(log(x)/log(2)) + 1
    # padds to 2**n amount of samples
    # pads to 2**n amount of samples
    fft_padded = padd(list(fft_data), 2 **
    n(len(fft_data) * pad_len) - len(fft_data))

    @@ -266,40 +349,34 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    peak_fit_tmp.append([x_fit_lim, y_fit_lim])
    fitted_wave.append(peak_fit_tmp)

    #pylab.plot(range(len(fft_data)), fft_data)
    #pylab.show()

    pylab.plot(x_axis, y_axis)
    pylab.hold(True)
    pylab.plot(x_axis_ifft, y_axis_ifft)
    #for max_p in max_peaks:
    # pylab.plot(max_p[0], max_p[1], 'xr')
    pylab.show()
    return [max_peaks, min_peaks]


    def peakdetect_parabole(y_axis, x_axis, points = 9):
    def peakdetect_parabola(y_axis, x_axis, points = 31):
    """
    Function for detecting local maximas and minmias in a signal.
    Function for detecting local maxima and minima in a signal.
    Discovers peaks by fitting the model function: y = k (x - tau) ** 2 + m
    to the peaks. The amount of points used in the fitting is set by the
    points argument.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    value silly if it was returned as index 50.234 or similar.
    value silly, if it was returned as index 50.234 or similar.
    will find the same amount of peaks as the 'peakdetect_zero_crossing'
    function, but might result in a more precise value of the peak.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    in the return to specify the position of the peaks.
    points -- How many points around the peak should be used during curve
    fitting (default: 31)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a list
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    @@ -318,33 +395,22 @@ def peakdetect_parabole(y_axis, x_axis, points = 9):
    max_peaks = []
    min_peaks = []

    max_ = _peakdetect_parabole_fitter(max_raw, x_axis, y_axis, points)
    min_ = _peakdetect_parabole_fitter(min_raw, x_axis, y_axis, points)
    max_ = _peakdetect_parabola_fitter(max_raw, x_axis, y_axis, points)
    min_ = _peakdetect_parabola_fitter(min_raw, x_axis, y_axis, points)

    max_peaks = map(lambda x: [x[0], x[1]], max_)
    max_fitted = map(lambda x: x[-1], max_)
    min_peaks = map(lambda x: [x[0], x[1]], min_)
    min_fitted = map(lambda x: x[-1], min_)


    #pylab.plot(x_axis, y_axis)
    #pylab.hold(True)
    #for max_p, max_f in zip(max_peaks, max_fitted):
    # pylab.plot(max_p[0], max_p[1], 'x')
    # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2)
    #for min_p, min_f in zip(min_peaks, min_fitted):
    # pylab.plot(min_p[0], min_p[1], 'x')
    # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2)
    #pylab.show()

    return [max_peaks, min_peaks]


    def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    def peakdetect_sine(y_axis, x_axis, points = 31, lock_frequency = False):
    """
    Function for detecting local maximas and minmias in a signal.
    Function for detecting local maxima and minima in a signal.
    Discovers peaks by fitting the model function:
    y = A * sin(2 * pi * f * x - tau) to the peaks. The amount of points used
    y = A * sin(2 * pi * f * (x - tau)) to the peaks. The amount of points used
    in the fitting is set by the points argument.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    @@ -356,24 +422,29 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    The function might have some problems if the sine wave has a
    non-negligible total angle i.e. a k*x component, as this messes with the
    internal offset calculation of the peaks, might be fixed by fitting a
    k * x + m function to the peaks for offset calculation.
    y = k * x + m function to the peaks for offset calculation.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    lock_frequency -- (optional) Specifies if the frequency argument of the
    model function should be locked to the value calculated from the raw
    peaks or if optimization process may tinker with it. (default: False)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    in the return to specify the position of the peaks.
    points -- How many points around the peak should be used during curve
    fitting (default: 31)
    lock_frequency -- Specifies if the frequency argument of the model
    function should be locked to the value calculated from the raw peaks
    or if optimization process may tinker with it.
    (default: False)
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    x, y = zip(*max_peaks)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    @@ -393,23 +464,23 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    # fitting a k * x + m function to the peaks might be better
    #offset_func = lambda x, k, m: k * x + m

    # calculate an approximate frequenzy of the signal
    Hz = []
    for raw in [max_raw, min_raw]:
    if len(raw) > 1:
    peak_pos = [x_axis[index] for index in zip(*raw)[0]]
    Hz.append(np.mean(np.diff(peak_pos)))
    Hz = 1 / np.mean(Hz)
    # calculate an approximate frequency of the signal
    Hz_h_peak = np.diff(zip(*max_raw)[0]).mean()
    Hz_l_peak = np.diff(zip(*min_raw)[0]).mean()
    Hz = 1 / np.mean([Hz_h_peak, Hz_l_peak])



    # model function
    # if cosine is used then tau could equal the x position of the peak
    # if sine were to be used then tau would be the first zero crossing
    if lock_frequency:
    func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2)
    func = lambda x_ax, A, tau: A * np.sin(
    2 * pi * Hz * (x_ax - tau) + pi / 2)
    else:
    func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) +
    pi / 2)
    #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau))
    func = lambda x_ax, A, Hz, tau: A * np.sin(
    2 * pi * Hz * (x_ax - tau) + pi / 2)
    #func = lambda x_ax, A, Hz, tau: A * np.cos(2 * pi * Hz * (x_ax - tau))


    #get peaks
    @@ -431,7 +502,7 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    else:
    p0 = (A, Hz, tau)

    # subtract offset from waveshape
    # subtract offset from wave-shape
    y_data -= offset
    popt, pcov = curve_fit(func, x_data, y_data, p0)
    # retrieve tau and A i.e x and y value of peak
    @@ -458,69 +529,108 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    min_fitted = map(lambda x: x[-1], fitted_peaks[1])


    #pylab.plot(x_axis, y_axis)
    #pylab.hold(True)
    #for max_p, max_f in zip(max_peaks, max_fitted):
    # pylab.plot(max_p[0], max_p[1], 'x')
    # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2)
    #for min_p, min_f in zip(min_peaks, min_fitted):
    # pylab.plot(min_p[0], min_p[1], 'x')
    # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2)
    #pylab.show()

    return [max_peaks, min_peaks]


    def peakdetect_sine_locked(y_axis, x_axis, points = 9):
    def peakdetect_sine_locked(y_axis, x_axis, points = 31):
    """
    Convinience function for calling the 'peakdetect_sine' function with
    Convenience function for calling the 'peakdetect_sine' function with
    the lock_frequency argument as True.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    return -- see 'peakdetect_sine'
    in the return to specify the position of the peaks.
    points -- How many points around the peak should be used during curve
    fitting (default: 31)
    return: see the function 'peakdetect_sine'
    """
    return peakdetect_sine(y_axis, x_axis, points, True)


    def peakdetect_spline(y_axis, x_axis, pad_len=20):
    """
    Performs a b-spline interpolation on the data to increase resolution and
    send the data to the 'peakdetect_zero_crossing' function for peak
    detection.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    value silly if it was returned as the index 50.234 or similar.
    will find the same amount of peaks as the 'peakdetect_zero_crossing'
    function, but might result in a more precise value of the peak.
    keyword arguments:
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the position of the peaks.
    x-axis must be equally spaced.
    pad_len -- By how many times the time resolution should be increased by,
    e.g. 1 doubles the resolution.
    (default: 20)
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*max_peaks)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    # could perform a check if x_axis is equally spaced
    #if np.std(np.diff(x_axis)) > 1e-15: raise ValueError
    # perform spline interpolations
    dx = x_axis[1] - x_axis[0]
    x_interpolated = np.linspace(x_axis.min(), x_axis.max(), len(x_axis) * (pad_len + 1))
    cj = cspline1d(y_axis)
    y_interpolated = cspline1d_eval(cj, x_interpolated, dx=dx,x0=x_axis[0])
    # get peaks
    max_peaks, min_peaks = peakdetect_zero_crossing(y_interpolated, x_interpolated)

    return [max_peaks, min_peaks]

    def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11):
    """
    Function for detecting local maximas and minmias in a signal.
    Function for detecting local maxima and minima in a signal.
    Discovers peaks by dividing the signal into bins and retrieving the
    maximum and minimum value of each the even and odd bins respectively.
    Division into bins is performed by smoothing the curve and finding the
    zero crossings.
    Suitable for repeatable signals, where some noise is tolerated. Excecutes
    Suitable for repeatable signals, where some noise is tolerated. Executes
    faster than 'peakdetect', although this function will break if the offset
    of the signal is too large. It should also be noted that the first and
    last peak will probably not be found, as this function only can find peaks
    between the first and last zero crossing.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- (optional) A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the postion of the peaks. If
    omitted an index of the y_axis is used. (default: None)
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the position of the peaks. If
    omitted an index of the y_axis is used.
    (default: None)
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 11)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    return: two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tuple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    x, y = zip(*max_peaks)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)

    zero_indices = zero_crossings(y_axis, window = window)
    zero_indices = zero_crossings(y_axis, window_len = window)
    period_lengths = np.diff(zero_indices)

    bins_y = [y_axis[index:index + diff] for index, diff in
    @@ -559,25 +669,27 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11):
    return [max_peaks, min_peaks]


    def _smooth(x, window_len=11, window='hanning'):
    def _smooth(x, window_len=11, window="hanning"):
    """
    smooth the data using a window of the requested size.
    This method is based on the convolution of a scaled window on the signal.
    The signal is prepared by introducing reflected copies of the signal
    (with the window size) in both ends so that transient parts are minimized
    in the begining and end part of the output signal.
    input:
    x: the input signal
    window_len: the dimension of the smoothing window; should be an odd
    integer
    window: the type of window from 'flat', 'hanning', 'hamming',
    'bartlett', 'blackman'
    flat window will produce a moving average smoothing.
    in the beginning and end part of the output signal.
    keyword arguments:
    x -- the input signal
    window_len -- the dimension of the smoothing window; should be an odd
    integer (default: 11)
    window -- the type of window from 'flat', 'hanning', 'hamming',
    'bartlett', 'blackman', where flat is a moving average
    (default: 'hanning')
    output:
    the smoothed signal
    return: the smoothed signal
    example:
    @@ -588,55 +700,62 @@ def _smooth(x, window_len=11, window='hanning'):
    see also:
    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman,
    numpy.convolve, scipy.signal.lfilter
    TODO: the window parameter could be the window itself if a list instead of
    a string
    numpy.convolve, scipy.signal.lfilter
    """
    if x.ndim != 1:
    raise ValueError, "smooth only accepts 1 dimension arrays."
    raise ValueError("smooth only accepts 1 dimension arrays.")

    if x.size < window_len:
    raise ValueError, "Input vector needs to be bigger than window size."
    raise ValueError("Input vector needs to be bigger than window size.")

    if window_len<3:
    return x
    #declare valid windows in a dictionary
    window_funcs = {
    "flat": lambda _len: np.ones(_len, "d"),
    "hanning": np.hanning,
    "hamming": np.hamming,
    "bartlett": np.bartlett,
    "blackman": np.blackman
    }

    if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
    raise(ValueError,
    s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]]
    try:
    w = window_funcs[window](window_len)
    except KeyError:
    raise ValueError(
    "Window is not one of '{0}', '{1}', '{2}', '{3}', '{4}'".format(
    *('flat', 'hanning', 'hamming', 'bartlett', 'blackman')))
    *window_funcs.keys()))

    y = np.convolve(w / w.sum(), s, mode = "valid")

    s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]]
    #print(len(s))
    if window == 'flat': #moving average
    w = np.ones(window_len,'d')
    else:
    w = eval('np.' + window + '(window_len)')

    y = np.convolve(w / w.sum(), s, mode = 'valid')
    return y


    def zero_crossings(y_axis, window = 11):
    def zero_crossings(y_axis, window_len = 11, window_f="hanning"):
    """
    Algorithm to find zero crossings. Smoothens the curve and finds the
    Algorithm to find zero crossings. Smooths the curve and finds the
    zero-crossings by looking for a sign change.
    keyword arguments:
    y_axis -- A list containg the signal over which to find zero-crossings
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 11)
    y_axis -- A list containing the signal over which to find zero-crossings
    window_len -- the dimension of the smoothing window; should be an odd
    integer (default: 11)
    return -- the index for each zero-crossing
    window_f -- the type of window from 'flat', 'hanning', 'hamming',
    'bartlett', 'blackman' (default: 'hanning')
    return: the index for each zero-crossing
    """
    # smooth the curve
    length = len(y_axis)
    x_axis = np.asarray(range(length), int)

    # discard tail of smoothed signal
    y_axis = _smooth(y_axis, window)[:length]
    y_axis = _smooth(y_axis, window_len, window_f)[:length]
    zero_crossings = np.where(np.diff(np.sign(y_axis)))[0]
    indices = [x_axis[index] for index in zero_crossings]

    @@ -645,12 +764,12 @@ def zero_crossings(y_axis, window = 11):
    if diff.std() / diff.mean() > 0.2:
    print diff.std() / diff.mean()
    print np.diff(indices)
    raise(ValueError,
    "False zero-crossings found, indicates problem {0} or {1}".format(
    raise ValueError(
    "False zero-crossings found, indicates problem {0!s} or {1!s}".format(
    "with smoothing window", "problem with offset"))
    # check if any zero crossings were found
    if len(zero_crossings) < 1:
    raise(ValueError, "No zero crossings found")
    raise ValueError("No zero crossings found")

    return indices
    # used this to test the fft function's sensitivity to spectral leakage
    @@ -669,7 +788,114 @@ def zero_crossings(y_axis, window = 11):
    ##############################################################################



    def zero_crossings_sine_fit(y_axis, x_axis, fit_window = None, smooth_window = 11):
    """
    Detects the zero crossings of a signal by fitting a sine model function
    around the zero crossings:
    y = A * sin(2 * pi * Hz * (x - tau)) + k * x + m
    Only tau (the zero crossing) is varied during fitting.
    Offset and a linear drift of offset is accounted for by fitting a linear
    function the negative respective positive raw peaks of the wave-shape and
    the amplitude is calculated using data from the offset calculation i.e.
    the 'm' constant from the negative peaks is subtracted from the positive
    one to obtain amplitude.
    Frequency is calculated using the mean time between raw peaks.
    Algorithm seems to be sensitive to first guess e.g. a large smooth_window
    will give an error in the results.
    keyword arguments:
    y_axis -- A list containing the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the position of the peaks. If
    omitted an index of the y_axis is used. (default: None)
    fit_window -- Number of points around the approximate zero crossing that
    should be used when fitting the sine wave. Must be small enough that
    no other zero crossing will be seen. If set to none then the mean
    distance between zero crossings will be used (default: None)
    smooth_window -- the dimension of the smoothing window; should be an odd
    integer (default: 11)
    return: A list containing the positions of all the zero crossings.
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    #get first guess
    zero_indices = zero_crossings(y_axis, window_len = smooth_window)
    #modify fit_window to show distance per direction
    if fit_window == None:
    fit_window = np.diff(zero_indices).mean() // 3
    else:
    fit_window = fit_window // 2

    #x_axis is a np array, use the indices to get a subset with zero crossings
    approx_crossings = x_axis[zero_indices]



    #get raw peaks for calculation of offsets and frequency
    raw_peaks = peakdetect_zero_crossing(y_axis, x_axis)
    #Use mean time between peaks for frequency
    ext = lambda x: list(zip(*x)[0])
    _diff = map(np.diff, map(ext, raw_peaks))


    Hz = 1 / np.mean(map(np.mean, _diff))
    #Hz = 1 / np.diff(approx_crossings).mean() #probably bad precision


    #offset model function
    offset_func = lambda x, k, m: k * x + m
    k = []
    m = []
    amplitude = []

    for peaks in raw_peaks:
    #get peak data as nparray
    x_data, y_data = map(np.asarray, zip(*peaks))
    #x_data = np.asarray(x_data)
    #y_data = np.asarray(y_data)
    #calc first guess
    A = np.mean(y_data)
    p0 = (0, A)
    popt, pcov = curve_fit(offset_func, x_data, y_data, p0)
    #append results
    k.append(popt[0])
    m.append(popt[1])
    amplitude.append(abs(A))

    #store offset constants
    p_offset = (np.mean(k), np.mean(m))
    A = m[0] - m[1]
    #define model function to fit to zero crossing
    #y = A * sin(2*pi * Hz * (x - tau)) + k * x + m
    func = lambda x, tau: A * np.sin(2 * pi * Hz * (x - tau)) + offset_func(x, *p_offset)


    #get true crossings
    true_crossings = []
    for indice, crossing in zip(zero_indices, approx_crossings):
    p0 = (crossing, )
    subset_start = max(indice - fit_window, 0.0)
    subset_end = min(indice + fit_window + 1, len(x_axis) - 1.0)
    x_subset = np.asarray(x_axis[subset_start:subset_end])
    y_subset = np.asarray(y_axis[subset_start:subset_end])
    #fit
    popt, pcov = curve_fit(func, x_subset, y_subset, p0)

    true_crossings.append(popt[0])


    return true_crossings




    def _test_zero():
    @@ -694,16 +920,35 @@ def _test_graph():

    plot = pylab.plot(x,y)
    pylab.hold(True)
    pylab.plot(xm, ym, 'r+')
    pylab.plot(xn, yn, 'g+')
    pylab.plot(xm, ym, "r+")
    pylab.plot(xn, yn, "g+")

    _max, _min = peak_det_bad.peakdetect(y, 0.7, x)
    xm = [p[0] for p in _max]
    ym = [p[1] for p in _max]
    xn = [p[0] for p in _min]
    yn = [p[1] for p in _min]
    pylab.plot(xm, ym, 'y*')
    pylab.plot(xn, yn, 'k*')
    pylab.plot(xm, ym, "y*")
    pylab.plot(xn, yn, "k*")
    pylab.show()

    def _test_graph_cross(window = 11):
    i = 10000
    x = np.linspace(0,8.7*pi,i)
    y = (2*np.sin(x) + 0.006 *
    np.random.randn(i))
    y *= -1
    pylab.plot(x,y)
    #pylab.show()


    crossings = zero_crossings_sine_fit(y,x, smooth_window = window)
    y_cross = [0] * len(crossings)


    plot = pylab.plot(x,y)
    pylab.hold(True)
    pylab.plot(crossings, y_cross, "b+")
    pylab.show()


    @@ -726,8 +971,8 @@ def _test_graph():

    plot = pylab.plot(x, y)
    pylab.hold(True)
    pylab.plot(xm, ym, 'r+')
    pylab.plot(xn, yn, 'g+')
    pylab.plot(xm, ym, "r+")
    pylab.plot(xn, yn, "g+")


    pylab.show()
    45 changes: 45 additions & 0 deletions readme.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    Musings about the peakdetect function by Sixten Bergman


    All the peak detection function in __all__ of peakdetect.py will work on
    consistent waveforms, but only peakdetect.peakdetect can properly handle
    offsets.

    The most accurate method for pure sine seems to be peakdetect_parabola,
    which for a 50Hz sine wave lasting 0.1s with 10k samples has an error in
    the order of 1e-10, whilst a naive most extreme sample will have an error
    in the order of 7e-5 for the position and 4e-7 for the amplitude

    Do note that this accuracy most likely doesn't stay true for any real world
    data where you'll have noise and harmonics in the signal which may produce
    errors in the functions, which may be smaller or larger then the error of
    naively using the highest/lowest point in a local maxima/minima.


    The sine fit function seem to perform even worse than a just retrieving the
    highest or lowest data point and is as such not recommended. The reason for
    this as far as I can tell is that the scipy.optimize.curve_fit can't optimize
    the variables.


    For parabola fit to function well it must be fitted to a small section of the
    peak as the curvature will start to mismatch with the function, but this also
    means that parabola should be quite sensitive to noise


    FFT interpolation has between 0 to 2 orders of magnitude improvement over a
    raw peak fit. To obtain this improvement the wave needs to be heavily padded
    in length

    Spline seems to have similar performance to a FFT interpolation of the time domain.
    Spline does however seem to be better at estimating amplitude than the FFT method,
    but is unknown if this will hold true for wave-shapes that are noisy.


    Automatic tests for sine fitted peak detection is disabled due to it's problems



    Avoid using the following function as they're questionable in performance:
    peakdetect_sine
    peakdetect_sine_locked
    467 changes: 467 additions & 0 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,467 @@
    #!/usr/bin/python2


    # Copyright (C) 2016 Sixten Bergman
    # License WTFPL
    #
    # This program is free software. It comes without any warranty, to the extent
    # permitted by applicable law.
    # You can redistribute it and/or modify it under the terms of the Do What The
    # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See
    # http://www.wtfpl.net/ for more details.
    #

    import analytic_wfm
    import numpy as np
    import peakdetect
    import unittest
    import pdb

    #generate time axis for 5 cycles @ 50 Hz
    linspace_standard = np.linspace(0, 0.10, 1000)
    linspace_peakdetect = np.linspace(0, 0.10, 10000)


    def _write_log(file, header, message):
    with open(file, 'ab') as f:
    f.write(header)
    f.write('\n')
    f.writelines(message)
    f.write('\n')
    f.write('\n')


    def _calculate_missmatch(received, expected):
    """
    Calculates the mean mismatch between received and expected data
    keyword arguments:
    received -- [[time of peak], [ampl of peak]]
    expected -- [[time of peak], [ampl of peak]]
    return (time mismatch, ampl mismatch)
    """
    #t_diff = np.abs(np.asarray(received[0]) - expected[0])
    t_diff = np.asarray(received[0]) - expected[0]
    a_diff = np.abs(np.asarray(received[1]) - expected[1])


    #t_diff /= np.abs(expected[0]) time error in absolute terms
    a_diff /= np.abs(expected[1])

    return (t_diff, a_diff)


    def _log_diff(t_max, y_max,
    t_min, y_min,
    t_max_expected, y_max_expected,
    t_min_expected, y_min_expected,
    file, name
    ):
    """
    """
    t_diff_h, a_diff_h = _calculate_missmatch([t_max, y_max],
    [t_max_expected, y_max_expected])


    t_diff_l, a_diff_l = _calculate_missmatch([t_min, y_min],
    [t_min_expected, y_min_expected])

    #data = ['\t{0:.2e}\t{1:.2e}\t{2:.2e}\t{3:.2e}'.format(*d) for d in
    # [t_diff_h, t_diff_l, a_diff_h, a_diff_l]
    # ]
    data = ['\t{0}'.format('\t'.join(map('{0:.2e}'.format, d))) for d in
    [t_diff_h, t_diff_l, a_diff_h, a_diff_l]
    ]

    _write_log(file, name, '\n'.join(data))



    def _is_close(max_p, min_p,
    expected_max, expected_min,
    atol_time, tol_ampl,
    file, name):
    """
    Determines if the peaks are within the given tolerance
    keyword arguments:
    max_p -- location and value of maxima
    min_p -- location and value of minima
    expected_max -- expected location and value of maxima
    expected_min -- expected location and value of minima
    atol_time -- absolute tolerance of location of vertex
    tol_ampl -- relative tolerance of value of vertex
    """
    if len(max_p) == 5:
    t_max_expected, y_max_expected = zip(*expected_max)
    else:
    t_max_expected, y_max_expected = zip(*expected_max[1:])
    if len(min_p) == 5:
    t_min_expected, y_min_expected = zip(*expected_min)
    else:
    t_min_expected, y_min_expected = zip(*expected_min[:-1])

    t_max, y_max = zip(*max_p)
    t_min, y_min = zip(*min_p)

    t_max_close = np.isclose(t_max, t_max_expected, atol=atol_time, rtol=1e-12)
    y_max_close = np.isclose(y_max, y_max_expected, tol_ampl)
    t_min_close = np.isclose(t_min, t_min_expected, atol=atol_time, rtol=1e-12)
    y_min_close = np.isclose(y_min, y_min_expected, tol_ampl)


    _log_diff(t_max, y_max, t_min, y_min,
    t_max_expected, y_max_expected,
    t_min_expected, y_min_expected,
    file, name)

    return(t_max_close, y_max_close, t_min_close, y_min_close)




    class Test_analytic_wfm(unittest.TestCase):
    def test_ACV1(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A1_L(linspace_standard)
    acv = analytic_wfm.ACV_A1(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))

    def test_ACV2(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A2_L(linspace_standard)
    acv = analytic_wfm.ACV_A2(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))

    def test_ACV3(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A3_L(linspace_standard)
    acv = analytic_wfm.ACV_A3(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))

    def test_ACV4(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A4_L(linspace_standard)
    acv = analytic_wfm.ACV_A4(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))

    def test_ACV5(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A5_L(linspace_standard)
    acv = analytic_wfm.ACV_A5(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))

    def test_ACV6(self):
    #compare with previous lambda implementation
    old = analytic_wfm._ACV_A6_L(linspace_standard)
    acv = analytic_wfm.ACV_A6(linspace_standard)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))


    def test_ACV7(self):
    num = np.linspace(0, 20, 1000)
    old = analytic_wfm._ACV_A7_OLD(num)
    acv = analytic_wfm.ACV_A7(num)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))


    def test_ACV8(self):
    num = np.linspace(0, 3150, 10000)
    old = analytic_wfm._ACV_A8_OLD(num)
    acv = analytic_wfm.ACV_A8(num)

    self.assertTrue(np.allclose(acv, old, rtol=1e-9))




    class _Test_peakdetect_template(unittest.TestCase):
    func = None
    file = "Mismatch data.txt"
    name = "template"
    args = []
    kwargs = {}
    msg_t = "Time of {0!s} not within tolerance:\n\t{1}"
    msg_y = "Amplitude of {0!s} not within tolerance:\n\t{1}"

    def _test_peak_template(self, waveform,
    expected_max, expected_min,
    wav_name,
    atol_time = 1e-5, tol_ampl = 1e-5):
    """
    keyword arguments:
    waveform -- a function that given x can generate a test waveform
    expected_max -- position and amplitude where maxima are expected
    expected_min -- position and amplitude where minima are expected
    wav_name -- Name of the test waveform
    atol_time -- absolute tolerance for position of vertex (default: 1e-5)
    tol_ampl -- relative tolerance for position of vertex (default: 1e-5)
    """

    y = waveform(linspace_peakdetect)
    max_p, min_p = self.func(y, linspace_peakdetect,
    *self.args, **self.kwargs
    )
    #check if the correct amount of peaks were discovered
    self.assertIn(len(max_p), [4,5])
    self.assertIn(len(min_p), [4,5])

    #
    # check if position and amplitude is within 0.001% which is approx the
    # numeric uncertainty from the amount of samples used
    #
    t_max_close, y_max_close, t_min_close, y_min_close = _is_close(max_p,
    min_p,
    expected_max,
    expected_min,
    atol_time, tol_ampl,
    self.file, "{0}: {1}".format(wav_name, self.name))

    #assert if values are outside of tolerance
    self.assertTrue(np.all(t_max_close),
    msg=self.msg_t.format("maxima", t_max_close))
    self.assertTrue(np.all(y_max_close),
    msg=self.msg_y.format("maxima", y_max_close))

    self.assertTrue(np.all(t_min_close),
    msg=self.msg_t.format("minima", t_min_close))
    self.assertTrue(np.all(y_min_close),
    msg=self.msg_y.format("minima", y_min_close))


    def test_peak_ACV1(self):
    peak_pos = 1000*np.sqrt(2) #1414.2135623730951
    peak_neg = -peak_pos
    expected_max = [
    [0.005, peak_pos],
    [0.025, peak_pos],
    [0.045, peak_pos],
    [0.065, peak_pos],
    [0.085, peak_pos]
    ]
    expected_min = [
    (0.015, peak_neg),
    (0.035, peak_neg),
    (0.055, peak_neg),
    (0.075, peak_neg),
    (0.095, peak_neg)
    ]
    atol_time = 1e-5
    tol_ampl = 1e-6

    self._test_peak_template(analytic_wfm.ACV_A1,
    expected_max, expected_min,
    "ACV1",
    atol_time, tol_ampl
    )

    def _test_peak_ACV2(self):
    peak_pos = 1000*np.sqrt(2) + 500 #1414.2135623730951 + 500
    peak_neg = (-1000*np.sqrt(2)) + 500 #-914.2135623730951
    expected_max = [
    [0.005, peak_pos],
    [0.025, peak_pos],
    [0.045, peak_pos],
    [0.065, peak_pos],
    [0.085, peak_pos]
    ]
    expected_min = [
    (0.015, peak_neg),
    (0.035, peak_neg),
    (0.055, peak_neg),
    (0.075, peak_neg),
    (0.095, peak_neg)
    ]
    atol_time = 1e-5
    tol_ampl = 1e-6

    self._test_peak_template(analytic_wfm.ACV_A2,
    expected_max, expected_min,
    "ACV2",
    atol_time, tol_ampl
    )


    def test_peak_ACV3(self):
    """
    WolframAlpha solution
    max{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~
    sin(6.28319 n+1.51306)-0.05 sin(25.1327 n+5.00505)
    at x~~0.00481623+0.02 n for integer n
    min{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~
    0.05 sin(6.55488-25.1327 n)-sin(1.37692-6.28319 n)
    at x~~-0.00438287+0.02 n for integer n
    Derivative for 50 Hz in 2 alternative forms
    y = 100pi*cos(100pi*x) - 25pi*cos(400pi*x)-0.3464*50*pi*sin(400pi*x)
    y = 100pi*cos(100pi*x) + 20pi*cos(400pi*x + 2*pi/3)
    root 0 = 1/(50 * pi) * (pi*0 - 0.68846026579266880983)
    The exact solution according to WolframAlpha - I haven't the foggiest
    (tan^(-1)(root of
    {#1^2-3&, 11 #2^8-8 #1 #2^7-8 #2^6+56 #1 #2^5+70 #2^4-56 #1 #2^3-48 #2^2+8 #1 #2-9&}(x)
    near x = -0.822751)+pi n) / (50 * pi)
    root 1 = 1/(50 * pi) * (pi*0 + 0.75653155241276430710)
    period = 0.02
    """
    base = 1000*np.sqrt(2)

    #def peak_pos(n):
    # return base * (np.sin(6.28319 * n + 1.51306)
    # -0.05*np.sin(25.1327 * n + 5.00505))
    #def peak_neg(n):
    # return base * (0.05 * np.sin(6.55488 - 25.1327 * n)
    # - np.sin(1.37692 - 6.28319 * n))


    def peak_pos(n):
    return base * (np.sin(2*np.pi * n + 1.51306)
    -0.05*np.sin(8*np.pi * n + 5.00505))
    def peak_neg(n):
    return base * (0.05 * np.sin(6.55488 - 8*np.pi * n)
    - np.sin(1.37692 - 2*np.pi * n))
    t_max = [
    0.75653155241276430710/(50*np.pi)+0.00,#0.004816229446859069
    0.75653155241276430710/(50*np.pi)+0.02,#0.024816229446859069
    0.75653155241276430710/(50*np.pi)+0.04,#0.044816229446859069
    0.75653155241276430710/(50*np.pi)+0.06,#0.064816229446859069
    0.75653155241276430710/(50*np.pi)+0.08 #0.084816229446859069
    ]
    t_min = [
    -0.68846026579266880983/(50*np.pi)+0.02,#0.015617125823069466
    -0.68846026579266880983/(50*np.pi)+0.04,#0.035617125823069466
    -0.68846026579266880983/(50*np.pi)+0.06,#0.055617125823069466
    -0.68846026579266880983/(50*np.pi)+0.08,#0.075617125823069466
    -0.68846026579266880983/(50*np.pi)+0.10 #0.095617125823069466
    ]

    expected_max = [
    (t_max[0], analytic_wfm.ACV_A3(t_max[0])),
    (t_max[1], analytic_wfm.ACV_A3(t_max[1])),
    (t_max[2], analytic_wfm.ACV_A3(t_max[2])),
    (t_max[3], analytic_wfm.ACV_A3(t_max[3])),
    (t_max[4], analytic_wfm.ACV_A3(t_max[4])),
    ]
    expected_min = [
    (t_min[0], analytic_wfm.ACV_A3(t_min[0])),
    (t_min[1], analytic_wfm.ACV_A3(t_min[1])),
    (t_min[2], analytic_wfm.ACV_A3(t_min[2])),
    (t_min[3], analytic_wfm.ACV_A3(t_min[3])),
    (t_min[4], analytic_wfm.ACV_A3(t_min[4])),
    ]
    atol_time = 1e-5
    tol_ampl = 2e-6
    #reduced tolerance since the expected values are only approximated

    self._test_peak_template(analytic_wfm.ACV_A3,
    expected_max, expected_min,
    "ACV3",
    atol_time, tol_ampl
    )



    class Test_peakdetect(_Test_peakdetect_template):
    name = "peakdetect"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect


    class Test_peakdetect_fft(_Test_peakdetect_template):
    name = "peakdetect_fft"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_fft, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_fft


    class Test_peakdetect_parabola(_Test_peakdetect_template):
    name = "peakdetect_parabola"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_parabola, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_parabola


    class Test_peakdetect_sine(_Test_peakdetect_template):
    name = "peakdetect_sine"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_sine, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_sine


    class Test_peakdetect_sine_locked(_Test_peakdetect_template):
    name = "peakdetect_sine_locked"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_sine_locked, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_sine_locked


    class Test_peakdetect_spline(_Test_peakdetect_template):
    name = "peakdetect_spline"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_spline, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_spline


    class Test_peakdetect_zero_crossing(_Test_peakdetect_template):
    name = "peakdetect_zero_crossing"
    def __init__(self, *args, **kwargs):
    super(Test_peakdetect_zero_crossing, self).__init__(*args, **kwargs)
    self.func = peakdetect.peakdetect_zero_crossing




    class Test_peakdetect_misc(unittest.TestCase):
    def test__pad(self):
    data = [1,2,3,4,5,6,5,4,3,2,1]
    pad_len = 2
    pad = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:]
    expected = pad(list(data), 2 **
    peakdetect._n(len(data) * pad_len) - len(data))
    received = peakdetect._pad(data, pad_len)

    self.assertListEqual(received, expected)
    def test__n(self):
    self.assertEqual(2**peakdetect._n(1000), 1024)





    #class zero_crossings(unittest.TestCase):




    if __name__ == '__main__':
    tests_to_run = [
    Test_analytic_wfm,
    Test_peakdetect,
    Test_peakdetect_parabola,
    Test_peakdetect_fft,
    #Test_peakdetect_sine, #sine tests disabled pending rework
    #Test_peakdetect_sine_locked,
    Test_peakdetect_spline,
    Test_peakdetect_zero_crossing,
    Test_peakdetect_misc
    ]

    suites_list = [unittest.TestLoader().loadTestsFromTestCase(test_class) for test_class in tests_to_run]
    big_suite = unittest.TestSuite(suites_list)
    unittest.TextTestRunner(verbosity=2).run(big_suite)
  4. sixtenbe revised this gist Mar 1, 2012. 1 changed file with 13 additions and 9 deletions.
    22 changes: 13 additions & 9 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -133,7 +133,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    #Maxima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].max() < mx:
    max_peaks.append((mxpos, mx))
    max_peaks.append([mxpos, mx])
    dump.append(True)
    #set algorithm to only find minima now
    mx = np.Inf
    @@ -151,7 +151,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    #Minima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].min() > mn:
    min_peaks.append((mnpos, mn))
    min_peaks.append([mnpos, mn])
    dump.append(False)
    #set algorithm to only find maxima now
    mn = -np.Inf
    @@ -175,7 +175,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    #no peaks were found, should the function return empty lists?
    pass

    return max_peaks, min_peaks
    return [max_peaks, min_peaks]


    def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    @@ -234,7 +234,7 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    fft_padded = padd(list(fft_data), 2 **
    n(len(fft_data) * pad_len) - len(fft_data))

    # amplitude of ifft is decreased, get sf to return to original amplitude
    # There is amplitude decrease directly proportional to the sample increase
    sf = len(fft_padded) / float(len(fft_data))
    # There might be a leakage giving the result an imaginary component
    # Return only the real component
    @@ -243,7 +243,8 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    x_axis[zero_indices[0]], x_axis[zero_indices[last_indice]],
    len(y_axis_ifft))
    # get the peaks to the interpolated waveform
    max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500, delta = abs(np.diff(y_axis).max() * 2))
    max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500,
    delta = abs(np.diff(y_axis).max() * 2))
    #max_peaks, min_peaks = peakdetect_zero_crossing(y_axis_ifft, x_axis_ifft)

    # store one 20th of a period as waveform data
    @@ -257,8 +258,10 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    index = 0
    for peak in peaks:
    index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index
    x_fit_lim = x_axis_ifft[index - data_len // 2: index + data_len // 2 + 1]
    y_fit_lim = y_axis_ifft[index - data_len // 2: index + data_len // 2 + 1]
    x_fit_lim = x_axis_ifft[index - data_len // 2:
    index + data_len // 2 + 1]
    y_fit_lim = y_axis_ifft[index - data_len // 2:
    index + data_len // 2 + 1]

    peak_fit_tmp.append([x_fit_lim, y_fit_lim])
    fitted_wave.append(peak_fit_tmp)
    @@ -404,7 +407,8 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    if lock_frequency:
    func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2)
    else:
    func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2)
    func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) +
    pi / 2)
    #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau))


    @@ -552,7 +556,7 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11):
    max_peaks = [[x, y] for x,y in zip(hi_peaks_x, hi_peaks)]
    min_peaks = [[x, y] for x,y in zip(lo_peaks_x, lo_peaks)]

    return max_peaks, min_peaks
    return [max_peaks, min_peaks]


    def _smooth(x, window_len=11, window='hanning'):
  5. sixtenbe revised this gist Mar 1, 2012. 1 changed file with 579 additions and 129 deletions.
    708 changes: 579 additions & 129 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -1,59 +1,126 @@
    import numpy as np
    from math import pi, log
    import pylab
    from scipy import fft, ifft
    from scipy.optimize import curve_fit

    def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    i = 10000
    x = np.linspace(0, 3.5 * pi, i)
    y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 *
    np.random.randn(i))


    def _datacheck_peakdetect(x_axis, y_axis):
    if x_axis is None:
    x_axis = range(len(y_axis))

    if len(y_axis) != len(x_axis):
    raise (ValueError,
    'Input vectors y_axis and x_axis must have same length')

    #needs to be a numpy array
    y_axis = np.array(y_axis)
    x_axis = np.array(x_axis)
    return x_axis, y_axis

    def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points):
    """
    Converted from/based on a MATLAB script at http://billauer.co.il/peakdet.html
    Performs the actual parabole fitting for the peakdetect_parabole function.
    keyword arguments:
    raw_peaks -- A list of either the maximium or the minimum peaks, as given
    by the peakdetect_zero_crossing function, with index used as x-axis
    x_axis -- A numpy list of all the x values
    y_axis -- A numpy list of all the y values
    points -- How many points around the peak should be used during curve
    fitting, must be odd.
    return -- A list giving all the peaks and the fitted waveform, format:
    [[x, y, [fitted_x, fitted_y]]]
    """
    func = lambda x, k, tau, m: k * ((x - tau) ** 2) + m
    fitted_peaks = []
    for peak in raw_peaks:
    index = peak[0]
    x_data = x_axis[index - points // 2: index + points // 2 + 1]
    y_data = y_axis[index - points // 2: index + points // 2 + 1]
    # get a first approximation of tau (peak position in time)
    tau = x_axis[index]
    # get a first approximation of peak amplitude
    m = peak[1]

    # build list of approximations
    # k = -m as first approximation?
    p0 = (-m, tau, m)
    popt, pcov = curve_fit(func, x_data, y_data, p0)
    # retrieve tau and m i.e x and y value of peak
    x, y = popt[1:3]

    # create a high resolution data set for the fitted waveform
    x2 = np.linspace(x_data[0], x_data[-1], points * 10)
    y2 = func(x2, *popt)

    fitted_peaks.append([x, y, [x2, y2]])

    return fitted_peaks


    Algorithm for detecting local maximas and minmias in a signal.
    def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0):
    """
    Converted from/based on a MATLAB script at:
    http://billauer.co.il/peakdet.html
    function for detecting local maximas and minmias in a signal.
    Discovers peaks by searching for values which are surrounded by lower
    or larger values for maximas and minimas respectively
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the peaks. If omitted the index
    of the y_axis is used. (default: None)
    x_axis -- (optional) A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the postion of the peaks. If
    omitted an index of the y_axis is used. (default: None)
    lookahead -- (optional) distance to look ahead from a peak candidate to
    determine if it is the actual peak (default: 500)
    determine if it is the actual peak (default: 200)
    '(sample / period) / f' where '4 >= f >= 1.25' might be a good value
    delta -- (optional) this specifies a minimum difference between a peak and
    the following points, before a peak may be considered a peak. Useful
    to hinder the algorithm from picking up false peaks towards to end of
    the signal. To work well delta should be set to 'delta >= RMSnoise * 5'.
    to hinder the function from picking up false peaks towards to end of
    the signal. To work well delta should be set to delta >= RMSnoise * 5.
    (default: 0)
    Delta function causes a 20% decrease in speed, when omitted
    Correctly used it can double the speed of the algorithm
    return -- two lists [maxtab, mintab] containing the positive and negative
    peaks respectively. Each cell of the lists contains a tupple of:
    (position, peak_value)
    to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results
    delta function causes a 20% decrease in speed, when omitted
    Correctly used it can double the speed of the function
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    """
    maxtab = []
    mintab = []
    dump = [] #Used to pop the first hit which always if false
    max_peaks = []
    min_peaks = []
    dump = [] #Used to pop the first hit which almost always is false

    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    # store data length for later use
    length = len(y_axis)
    if x_axis is None:
    x_axis = range(length)


    #perform some checks
    if length != len(x_axis):
    raise ValueError, "Input vectors y_axis and x_axis must have same length"
    if lookahead < 1:
    raise ValueError, "Lookahead must be above '1' in value"
    raise ValueError, "Lookahead must be '1' or above in value"
    if not (np.isscalar(delta) and delta >= 0):
    raise ValueError, "delta must be a positive number"

    #needs to be a numpy array
    y_axis = np.asarray(y_axis)

    #maxima and minima candidates are temporarily stored in
    #mx and mn respectively
    mn, mx = np.Inf, -np.Inf

    #Only detect peak if there is 'lookahead' amount of points after it
    for index, (x, y) in enumerate(zip(x_axis[:-lookahead], y_axis[:-lookahead])):
    for index, (x, y) in enumerate(zip(x_axis[:-lookahead],
    y_axis[:-lookahead])):
    if y > mx:
    mx = y
    mxpos = x
    @@ -66,204 +133,552 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    #Maxima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].max() < mx:
    maxtab.append((mxpos, mx))
    max_peaks.append((mxpos, mx))
    dump.append(True)
    #set algorithm to only find minima now
    mx = np.Inf
    mn = np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    continue
    #else: #slows shit down this does
    # mx = ahead
    # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)]

    ####look for min####
    if y > mn+delta and mn != -np.Inf:
    #Minima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].min() > mn:
    mintab.append((mnpos, mn))
    min_peaks.append((mnpos, mn))
    dump.append(False)
    #set algorithm to only find maxima now
    mn = -np.Inf
    mx = -np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    #else: #slows shit down this does
    # mn = ahead
    # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)]


    #Remove the false hit on the first value of the y_axis
    try:
    if dump[0]:
    maxtab.pop(0)
    #print "pop max"
    max_peaks.pop(0)
    else:
    mintab.pop(0)
    #print "pop min"
    min_peaks.pop(0)
    del dump
    except IndexError:
    #no peaks were found, should the function return empty lists?
    pass

    return max_peaks, min_peaks


    def peakdetect_fft(y_axis, x_axis, pad_len = 5):
    """
    Performs a FFT calculation on the data and zero-pads the results to
    increase the time domain resolution after performing the inverse fft and
    send the data to the 'peakdetect' function for peak
    detection.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    value silly if it was returned as the index 50.234 or similar.
    Will find at least 1 less peak then the 'peakdetect_zero_crossing'
    function, but should result in a more precise value of the peak as
    resolution has been increased. Some peaks are lost in an attempt to
    minimize spectral leakage by calculating the fft between two zero
    crossings for n amount of signal periods.
    The biggest time eater in this function is the ifft and thereafter it's
    the 'peakdetect' function which takes only half the time of the ifft.
    Speed improvementd could include to check if 2**n points could be used for
    fft and ifft or change the 'peakdetect' to the 'peakdetect_zero_crossing',
    which is maybe 10 times faster than 'peakdetct'. The pro of 'peakdetect'
    is that it resutls in one less lost peak. It should also be noted that the
    time used by the ifft function can change greatly depending on the input.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    pad_len -- (optional) By how many times the time resolution should be
    increased by, e.g. 1 doubles the resolution. The amount is rounded up
    to the nearest 2 ** n amount (default: 5)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    zero_indices = zero_crossings(y_axis, window = 11)
    #select a n amount of periods
    last_indice = - 1 - (1 - len(zero_indices) & 1)
    # Calculate the fft between the first and last zero crossing
    # this method could be ignored if the begining and the end of the signal
    # are discardable as any errors induced from not using whole periods
    # should mainly manifest in the beginning and the end of the signal, but
    # not in the rest of the signal
    fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]])
    padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:]
    n = lambda x: int(log(x)/log(2)) + 1
    # padds to 2**n amount of samples
    fft_padded = padd(list(fft_data), 2 **
    n(len(fft_data) * pad_len) - len(fft_data))

    # amplitude of ifft is decreased, get sf to return to original amplitude
    sf = len(fft_padded) / float(len(fft_data))
    # There might be a leakage giving the result an imaginary component
    # Return only the real component
    y_axis_ifft = ifft(fft_padded).real * sf #(pad_len + 1)
    x_axis_ifft = np.linspace(
    x_axis[zero_indices[0]], x_axis[zero_indices[last_indice]],
    len(y_axis_ifft))
    # get the peaks to the interpolated waveform
    max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500, delta = abs(np.diff(y_axis).max() * 2))
    #max_peaks, min_peaks = peakdetect_zero_crossing(y_axis_ifft, x_axis_ifft)

    # store one 20th of a period as waveform data
    data_len = int(np.diff(zero_indices).mean()) / 10
    data_len += 1 - data_len & 1


    fitted_wave = []
    for peaks in [max_peaks, min_peaks]:
    peak_fit_tmp = []
    index = 0
    for peak in peaks:
    index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index
    x_fit_lim = x_axis_ifft[index - data_len // 2: index + data_len // 2 + 1]
    y_fit_lim = y_axis_ifft[index - data_len // 2: index + data_len // 2 + 1]

    peak_fit_tmp.append([x_fit_lim, y_fit_lim])
    fitted_wave.append(peak_fit_tmp)

    #pylab.plot(range(len(fft_data)), fft_data)
    #pylab.show()

    pylab.plot(x_axis, y_axis)
    pylab.hold(True)
    pylab.plot(x_axis_ifft, y_axis_ifft)
    #for max_p in max_peaks:
    # pylab.plot(max_p[0], max_p[1], 'xr')
    pylab.show()
    return [max_peaks, min_peaks]


    def peakdetect_parabole(y_axis, x_axis, points = 9):
    """
    Function for detecting local maximas and minmias in a signal.
    Discovers peaks by fitting the model function: y = k (x - tau) ** 2 + m
    to the peaks. The amount of points used in the fitting is set by the
    points argument.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    value silly if it was returned as index 50.234 or similar.
    will find the same amount of peaks as the 'peakdetect_zero_crossing'
    function, but might result in a more precise value of the peak.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a list
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*max_peaks)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    # make the points argument odd
    points += 1 - points % 2
    #points += 1 - int(points) & 1 slower when int conversion needed

    # get raw peaks
    max_raw, min_raw = peakdetect_zero_crossing(y_axis)

    # define output variable
    max_peaks = []
    min_peaks = []

    max_ = _peakdetect_parabole_fitter(max_raw, x_axis, y_axis, points)
    min_ = _peakdetect_parabole_fitter(min_raw, x_axis, y_axis, points)

    max_peaks = map(lambda x: [x[0], x[1]], max_)
    max_fitted = map(lambda x: x[-1], max_)
    min_peaks = map(lambda x: [x[0], x[1]], min_)
    min_fitted = map(lambda x: x[-1], min_)


    #pylab.plot(x_axis, y_axis)
    #pylab.hold(True)
    #for max_p, max_f in zip(max_peaks, max_fitted):
    # pylab.plot(max_p[0], max_p[1], 'x')
    # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2)
    #for min_p, min_f in zip(min_peaks, min_fitted):
    # pylab.plot(min_p[0], min_p[1], 'x')
    # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2)
    #pylab.show()

    return [max_peaks, min_peaks]

    return maxtab, mintab


    def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False):
    """
    Function for detecting local maximas and minmias in a signal.
    Discovers peaks by fitting the model function:
    y = A * sin(2 * pi * f * x - tau) to the peaks. The amount of points used
    in the fitting is set by the points argument.
    Omitting the x_axis is forbidden as it would make the resulting x_axis
    value silly if it was returned as index 50.234 or similar.
    will find the same amount of peaks as the 'peakdetect_zero_crossing'
    function, but might result in a more precise value of the peak.
    The function might have some problems if the sine wave has a
    non-negligible total angle i.e. a k*x component, as this messes with the
    internal offset calculation of the peaks, might be fixed by fitting a
    k * x + m function to the peaks for offset calculation.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    lock_frequency -- (optional) Specifies if the frequency argument of the
    model function should be locked to the value calculated from the raw
    peaks or if optimization process may tinker with it. (default: False)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    """
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)
    # make the points argument odd
    points += 1 - points % 2
    #points += 1 - int(points) & 1 slower when int conversion needed

    # get raw peaks
    max_raw, min_raw = peakdetect_zero_crossing(y_axis)

    # define output variable
    max_peaks = []
    min_peaks = []

    # get global offset
    offset = np.mean([np.mean(max_raw, 0)[1], np.mean(min_raw, 0)[1]])
    # fitting a k * x + m function to the peaks might be better
    #offset_func = lambda x, k, m: k * x + m

    # calculate an approximate frequenzy of the signal
    Hz = []
    for raw in [max_raw, min_raw]:
    if len(raw) > 1:
    peak_pos = [x_axis[index] for index in zip(*raw)[0]]
    Hz.append(np.mean(np.diff(peak_pos)))
    Hz = 1 / np.mean(Hz)

    # model function
    # if cosine is used then tau could equal the x position of the peak
    # if sine were to be used then tau would be the first zero crossing
    if lock_frequency:
    func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2)
    else:
    func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2)
    #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau))


    #get peaks
    fitted_peaks = []
    for raw_peaks in [max_raw, min_raw]:
    peak_data = []
    for peak in raw_peaks:
    index = peak[0]
    x_data = x_axis[index - points // 2: index + points // 2 + 1]
    y_data = y_axis[index - points // 2: index + points // 2 + 1]
    # get a first approximation of tau (peak position in time)
    tau = x_axis[index]
    # get a first approximation of peak amplitude
    A = peak[1]

    # build list of approximations
    if lock_frequency:
    p0 = (A, tau)
    else:
    p0 = (A, Hz, tau)

    # subtract offset from waveshape
    y_data -= offset
    popt, pcov = curve_fit(func, x_data, y_data, p0)
    # retrieve tau and A i.e x and y value of peak
    x = popt[-1]
    y = popt[0]

    # create a high resolution data set for the fitted waveform
    x2 = np.linspace(x_data[0], x_data[-1], points * 10)
    y2 = func(x2, *popt)

    # add the offset to the results
    y += offset
    y2 += offset
    y_data += offset

    peak_data.append([x, y, [x2, y2]])

    fitted_peaks.append(peak_data)

    # structure date for output
    max_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[0])
    max_fitted = map(lambda x: x[-1], fitted_peaks[0])
    min_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[1])
    min_fitted = map(lambda x: x[-1], fitted_peaks[1])


    #pylab.plot(x_axis, y_axis)
    #pylab.hold(True)
    #for max_p, max_f in zip(max_peaks, max_fitted):
    # pylab.plot(max_p[0], max_p[1], 'x')
    # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2)
    #for min_p, min_f in zip(min_peaks, min_fitted):
    # pylab.plot(min_p[0], min_p[1], 'x')
    # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2)
    #pylab.show()

    return [max_peaks, min_peaks]

    def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49):

    def peakdetect_sine_locked(y_axis, x_axis, points = 9):
    """
    Algorithm for detecting local maximas and minmias in a signal.
    Convinience function for calling the 'peakdetect_sine' function with
    the lock_frequency argument as True.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the y_axis list and is used
    in the return to specify the postion of the peaks.
    points -- (optional) How many points around the peak should be used during
    curve fitting, must be odd (default: 9)
    return -- see 'peakdetect_sine'
    """
    return peakdetect_sine(y_axis, x_axis, points, True)


    def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11):
    """
    Function for detecting local maximas and minmias in a signal.
    Discovers peaks by dividing the signal into bins and retrieving the
    maximum and minimum value of each the even and odd bins respectively.
    Division into bins is performed by smoothing the curve and finding the
    zero crossings.
    Suitable for repeatable sinusoidal signals with some amount of RMS noise
    tolerable. Excecutes faster than 'peakdetect', although this function will
    break if the offset of the signal is too large. It should also be noted
    that the first and last peak will probably not be found, as this algorithm
    only can find peaks between the first and last zero crossing.
    Suitable for repeatable signals, where some noise is tolerated. Excecutes
    faster than 'peakdetect', although this function will break if the offset
    of the signal is too large. It should also be noted that the first and
    last peak will probably not be found, as this function only can find peaks
    between the first and last zero crossing.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the peaks. If omitted the index
    of the y_axis is used. (default: None)
    x_axis -- (optional) A x-axis whose values correspond to the y_axis list
    and is used in the return to specify the postion of the peaks. If
    omitted an index of the y_axis is used. (default: None)
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 49)
    return -- two lists [maxtab, mintab] containing the positive and negative
    peaks respectively. Each cell of the lists contains a tupple of:
    (position, peak_value)
    to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results
    (default: 11)
    return -- two lists [max_peaks, min_peaks] containing the positive and
    negative peaks respectively. Each cell of the lists contains a tupple
    of: (position, peak_value)
    to get the average peak value do: np.mean(max_peaks, 0)[1] on the
    results to unpack one of the lists into x, y coordinates do:
    x, y = zip(*tab)
    """
    if x_axis is None:
    x_axis = range(len(y_axis))

    length = len(y_axis)
    if length != len(x_axis):
    raise ValueError, 'Input vectors y_axis and x_axis must have same length'

    #needs to be a numpy array
    y_axis = np.asarray(y_axis)
    # check input data
    x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis)

    zero_indices = zero_crossings(y_axis, window = window)
    period_lengths = np.diff(zero_indices)

    bins = [y_axis[indice:indice+diff] for indice, diff in
    bins_y = [y_axis[index:index + diff] for index, diff in
    zip(zero_indices, period_lengths)]
    bins_x = [x_axis[index:index + diff] for index, diff in
    zip(zero_indices, period_lengths)]

    even_bins_y = bins_y[::2]
    odd_bins_y = bins_y[1::2]
    even_bins_x = bins_x[::2]
    odd_bins_x = bins_x[1::2]
    hi_peaks_x = []
    lo_peaks_x = []

    even_bins = bins[::2]
    odd_bins = bins[1::2]
    #check if even bin contains maxima
    if even_bins[0].max() > abs(even_bins[0].min()):
    hi_peaks = [bin.max() for bin in even_bins]
    lo_peaks = [bin.min() for bin in odd_bins]
    if abs(even_bins_y[0].max()) > abs(even_bins_y[0].min()):
    hi_peaks = [bin.max() for bin in even_bins_y]
    lo_peaks = [bin.min() for bin in odd_bins_y]
    # get x values for peak
    for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, hi_peaks):
    hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]])
    for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, lo_peaks):
    lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]])
    else:
    hi_peaks = [bin.max() for bin in odd_bins]
    lo_peaks = [bin.min() for bin in even_bins]


    hi_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in hi_peaks]
    lo_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in lo_peaks]

    maxtab = [(x,y) for x,y in zip(hi_peaks, hi_peaks_x)]
    mintab = [(x,y) for x,y in zip(lo_peaks, lo_peaks_x)]

    return maxtab, mintab
    hi_peaks = [bin.max() for bin in odd_bins_y]
    lo_peaks = [bin.min() for bin in even_bins_y]
    # get x values for peak
    for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, hi_peaks):
    hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]])
    for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, lo_peaks):
    lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]])

    max_peaks = [[x, y] for x,y in zip(hi_peaks_x, hi_peaks)]
    min_peaks = [[x, y] for x,y in zip(lo_peaks_x, lo_peaks)]

    return max_peaks, min_peaks



    def smooth(x,window_len=11,window='hanning'):

    def _smooth(x, window_len=11, window='hanning'):
    """
    smooth the data using a window with requested size.
    smooth the data using a window of the requested size.
    This method is based on the convolution of a scaled window with the signal.
    This method is based on the convolution of a scaled window on the signal.
    The signal is prepared by introducing reflected copies of the signal
    (with the window size) in both ends so that transient parts are minimized
    in the begining and end part of the output signal.
    input:
    x: the input signal
    window_len: the dimension of the smoothing window; should be an odd integer
    window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
    window_len: the dimension of the smoothing window; should be an odd
    integer
    window: the type of window from 'flat', 'hanning', 'hamming',
    'bartlett', 'blackman'
    flat window will produce a moving average smoothing.
    output:
    the smoothed signal
    example:
    t=linspace(-2,2,0.1)
    x=sin(t)+randn(len(t))*0.1
    y=smooth(x)
    t = linspace(-2,2,0.1)
    x = sin(t)+randn(len(t))*0.1
    y = _smooth(x)
    see also:
    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
    scipy.signal.lfilter
    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman,
    numpy.convolve, scipy.signal.lfilter
    TODO: the window parameter could be the window itself if an array instead of a string
    TODO: the window parameter could be the window itself if a list instead of
    a string
    """
    if x.ndim != 1:
    raise ValueError, "smooth only accepts 1 dimension arrays."

    if x.size < window_len:
    raise ValueError, "Input vector needs to be bigger than window size."



    if window_len<3:
    return x



    if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
    raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"


    s=np.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]]
    raise(ValueError,
    "Window is not one of '{0}', '{1}', '{2}', '{3}', '{4}'".format(
    *('flat', 'hanning', 'hamming', 'bartlett', 'blackman')))

    s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]]
    #print(len(s))
    if window == 'flat': #moving average
    w=np.ones(window_len,'d')
    w = np.ones(window_len,'d')
    else:
    w=eval('np.'+window+'(window_len)')
    w = eval('np.' + window + '(window_len)')

    y=np.convolve(w/w.sum(),s,mode='valid')
    y = np.convolve(w / w.sum(), s, mode = 'valid')
    return y

    def zero_crossings(y_axis, x_axis = None, window = 49):

    def zero_crossings(y_axis, window = 11):
    """
    Algorithm to find zero crossings. Smoothens the curve and finds the
    zero-crossings by looking for a sign change.
    keyword arguments:
    y_axis -- A list containg the signal over which to find zero-crossings
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the zero-crossings. If omitted
    then the indice of the y_axis is used. (default: None)
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 49)
    (default: 11)
    return -- the x_axis value or the indice for each zero-crossing
    return -- the index for each zero-crossing
    """
    #smooth the curve
    # smooth the curve
    length = len(y_axis)
    if x_axis == None:
    x_axis = range(length)

    x_axis = np.asarray(x_axis)
    x_axis = np.asarray(range(length), int)

    y_axis = smooth(y_axis, window)[:length]
    # discard tail of smoothed signal
    y_axis = _smooth(y_axis, window)[:length]
    zero_crossings = np.where(np.diff(np.sign(y_axis)))[0]
    times = [x_axis[indice] for indice in zero_crossings]

    #check if zero-crossings are valid
    diff = np.diff(times)
    if diff.std() / diff.mean() > 0.1:
    raise ValueError, "smoothing window too small, false zero-crossings found"

    return times
    indices = [x_axis[index] for index in zero_crossings]

    # check if zero-crossings are valid
    diff = np.diff(indices)
    if diff.std() / diff.mean() > 0.2:
    print diff.std() / diff.mean()
    print np.diff(indices)
    raise(ValueError,
    "False zero-crossings found, indicates problem {0} or {1}".format(
    "with smoothing window", "problem with offset"))
    # check if any zero crossings were found
    if len(zero_crossings) < 1:
    raise(ValueError, "No zero crossings found")

    return indices
    # used this to test the fft function's sensitivity to spectral leakage
    #return indices + np.asarray(30 * np.random.randn(len(indices)), int)

    ############################Frequency calculation#############################
    # diff = np.diff(indices)
    # time_p_period = diff.mean()
    #
    # if diff.std() / time_p_period > 0.1:
    # raise ValueError,
    # "smoothing window too small, false zero-crossing found"
    #
    # #return frequency
    # return 1.0 / time_p_period
    ##############################################################################


    if __name__=="__main__":
    import pylab
    from math import pi



    def _test_zero():
    _max, _min = peakdetect_zero_crossing(y,x)
    def _test():
    _max, _min = peakdetect(y,x, delta=0.30)


    def _test_graph():
    i = 10000
    x = np.linspace(0,3.7*pi,i)
    y = 0.3*np.sin(x) + np.sin(1.3*x) + 0.9*np.sin(4.2*x) + 0.06*np.random.randn(i)
    y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 *
    np.random.randn(i))
    y *= -1
    x = range(i)

    @@ -276,4 +691,39 @@ def zero_crossings(y_axis, x_axis = None, window = 49):
    plot = pylab.plot(x,y)
    pylab.hold(True)
    pylab.plot(xm, ym, 'r+')
    pylab.plot(xn, yn, 'g+')
    pylab.plot(xn, yn, 'g+')

    _max, _min = peak_det_bad.peakdetect(y, 0.7, x)
    xm = [p[0] for p in _max]
    ym = [p[1] for p in _max]
    xn = [p[0] for p in _min]
    yn = [p[1] for p in _min]
    pylab.plot(xm, ym, 'y*')
    pylab.plot(xn, yn, 'k*')
    pylab.show()



    if __name__ == "__main__":
    from math import pi
    import pylab

    i = 10000
    x = np.linspace(0,3.7*pi,i)
    y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 *
    np.random.randn(i))
    y *= -1

    _max, _min = peakdetect(y, x, 750, 0.30)
    xm = [p[0] for p in _max]
    ym = [p[1] for p in _max]
    xn = [p[0] for p in _min]
    yn = [p[1] for p in _min]

    plot = pylab.plot(x, y)
    pylab.hold(True)
    pylab.plot(xm, ym, 'r+')
    pylab.plot(xn, yn, 'g+')


    pylab.show()
  6. sixtenbe revised this gist Aug 29, 2011. 1 changed file with 18 additions and 23 deletions.
    41 changes: 18 additions & 23 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,4 @@
    import sys
    from numpy import NaN, Inf, arange, isscalar, asarray
    import numpy as np

    def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    """
    @@ -38,8 +37,13 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    if x_axis is None:
    x_axis = range(length)

    #perform some checks
    if length != len(x_axis):
    raise ValueError, 'Input vectors y_axis and x_axis must have same length'
    raise ValueError, "Input vectors y_axis and x_axis must have same length"
    if lookahead < 1:
    raise ValueError, "Lookahead must be above '1' in value"
    if not (np.isscalar(delta) and delta >= 0):
    raise ValueError, "delta must be a positive number"

    #needs to be a numpy array
    y_axis = np.asarray(y_axis)
    @@ -67,13 +71,6 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    #set algorithm to only find minima now
    mx = np.Inf
    mn = np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    continue
    #else: #slows shit down this does
    # mx = ahead
    # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)]

    ####look for min####
    if y > mn+delta and mn != -np.Inf:
    @@ -85,22 +82,20 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    #set algorithm to only find maxima now
    mn = -np.Inf
    mx = -np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    #else: #slows shit down this does
    # mn = ahead
    # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)]


    #Remove the false hit on the first value of the y_axis
    if dump[0]:
    maxtab.pop(0)
    #print "pop max"
    else:
    mintab.pop(0)
    #print "pop min"
    del dump
    try:
    if dump[0]:
    maxtab.pop(0)
    #print "pop max"
    else:
    mintab.pop(0)
    #print "pop min"
    del dump
    except IndexError:
    #no peaks were found, should the function return empty lists?
    pass

    return maxtab, mintab

  7. sixtenbe revised this gist Aug 29, 2011. 1 changed file with 5 additions and 3 deletions.
    8 changes: 5 additions & 3 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -18,9 +18,9 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    determine if it is the actual peak (default: 500)
    '(sample / period) / f' where '4 >= f >= 1.25' might be a good value
    delta -- (optional) this specifies a minimum difference between a peak and
    the points following it, before a peak may be considered a peak. Useful
    the following points, before a peak may be considered a peak. Useful
    to hinder the algorithm from picking up false peaks towards to end of
    the signal. To work well delta should be set to 'delta >= StdDev * 5'.
    the signal. To work well delta should be set to 'delta >= RMSnoise * 5'.
    (default: 0)
    Delta function causes a 20% decrease in speed, when omitted
    Correctly used it can double the speed of the algorithm
    @@ -116,7 +116,9 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49):
    Suitable for repeatable sinusoidal signals with some amount of RMS noise
    tolerable. Excecutes faster than 'peakdetect', although this function will
    break if the offset of the signal is too large.
    break if the offset of the signal is too large. It should also be noted
    that the first and last peak will probably not be found, as this algorithm
    only can find peaks between the first and last zero crossing.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
  8. sixtenbe revised this gist Aug 29, 2011. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -114,6 +114,9 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49):
    Division into bins is performed by smoothing the curve and finding the
    zero crossings.
    Suitable for repeatable sinusoidal signals with some amount of RMS noise
    tolerable. Excecutes faster than 'peakdetect', although this function will
    break if the offset of the signal is too large.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
  9. sixtenbe revised this gist Aug 29, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,7 @@
    import sys
    from numpy import NaN, Inf, arange, isscalar, asarray

    def peakdet(v, delta, x = None):
    def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0):
    """
    Converted from/based on a MATLAB script at http://billauer.co.il/peakdet.html
  10. sixtenbe revised this gist Aug 29, 2011. 1 changed file with 258 additions and 56 deletions.
    314 changes: 258 additions & 56 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -3,75 +3,277 @@

    def peakdet(v, delta, x = None):
    """
    Converted from MATLAB script at http://billauer.co.il/peakdet.html
    Currently returns two lists of tuples, but maybe arrays would be better
    function [maxtab, mintab]=peakdet(v, delta, x)
    %PEAKDET Detect peaks in a vector
    % [MAXTAB, MINTAB] = PEAKDET(V, DELTA) finds the local
    % maxima and minima ("peaks") in the vector V.
    % MAXTAB and MINTAB consists of two columns. Column 1
    % contains indices in V, and column 2 the found values.
    %
    % With [MAXTAB, MINTAB] = PEAKDET(V, DELTA, X) the indices
    % in MAXTAB and MINTAB are replaced with the corresponding
    % X-values.
    %
    % A point is considered a maximum peak if it has the maximal
    % value, and was preceded (to the left) by a value lower by
    % DELTA.
    % Eli Billauer, 3.4.05 (Explicitly not copyrighted).
    % This function is released to the public domain; Any use is allowed.
    Converted from/based on a MATLAB script at http://billauer.co.il/peakdet.html
    Algorithm for detecting local maximas and minmias in a signal.
    Discovers peaks by searching for values which are surrounded by lower
    or larger values for maximas and minimas respectively
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the peaks. If omitted the index
    of the y_axis is used. (default: None)
    lookahead -- (optional) distance to look ahead from a peak candidate to
    determine if it is the actual peak (default: 500)
    '(sample / period) / f' where '4 >= f >= 1.25' might be a good value
    delta -- (optional) this specifies a minimum difference between a peak and
    the points following it, before a peak may be considered a peak. Useful
    to hinder the algorithm from picking up false peaks towards to end of
    the signal. To work well delta should be set to 'delta >= StdDev * 5'.
    (default: 0)
    Delta function causes a 20% decrease in speed, when omitted
    Correctly used it can double the speed of the algorithm
    return -- two lists [maxtab, mintab] containing the positive and negative
    peaks respectively. Each cell of the lists contains a tupple of:
    (position, peak_value)
    to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results
    """
    maxtab = []
    mintab = []
    dump = [] #Used to pop the first hit which always if false

    if x is None:
    x = arange(len(v))

    v = asarray(v)

    if len(v) != len(x):
    sys.exit('Input vectors v and x must have same length')
    length = len(y_axis)
    if x_axis is None:
    x_axis = range(length)

    if not isscalar(delta):
    sys.exit('Input argument delta must be a scalar')
    if length != len(x_axis):
    raise ValueError, 'Input vectors y_axis and x_axis must have same length'

    if delta <= 0:
    sys.exit('Input argument delta must be positive')
    #needs to be a numpy array
    y_axis = np.asarray(y_axis)

    mn, mx = Inf, -Inf
    mnpos, mxpos = NaN, NaN
    #maxima and minima candidates are temporarily stored in
    #mx and mn respectively
    mn, mx = np.Inf, -np.Inf

    lookformax = True

    for i in arange(len(v)):
    this = v[i]
    if this > mx:
    mx = this
    mxpos = x[i]
    if this < mn:
    mn = this
    mnpos = x[i]
    #Only detect peak if there is 'lookahead' amount of points after it
    for index, (x, y) in enumerate(zip(x_axis[:-lookahead], y_axis[:-lookahead])):
    if y > mx:
    mx = y
    mxpos = x
    if y < mn:
    mn = y
    mnpos = x

    if lookformax:
    if this < mx-delta:
    ####look for max####
    if y < mx-delta and mx != np.Inf:
    #Maxima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].max() < mx:
    maxtab.append((mxpos, mx))
    mn = this
    mnpos = x[i]
    lookformax = False
    else:
    if this > mn+delta:
    dump.append(True)
    #set algorithm to only find minima now
    mx = np.Inf
    mn = np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    continue
    #else: #slows shit down this does
    # mx = ahead
    # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)]

    ####look for min####
    if y > mn+delta and mn != -np.Inf:
    #Minima peak candidate found
    #look ahead in signal to ensure that this is a peak and not jitter
    if y_axis[index:index+lookahead].min() > mn:
    mintab.append((mnpos, mn))
    mx = this
    mxpos = x[i]
    lookformax = True
    dump.append(False)
    #set algorithm to only find maxima now
    mn = -np.Inf
    mx = -np.Inf
    if index+lookahead >= length:
    #end is within lookahead no more peaks can be found
    break
    #else: #slows shit down this does
    # mn = ahead
    # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)]


    #Remove the false hit on the first value of the y_axis
    if dump[0]:
    maxtab.pop(0)
    #print "pop max"
    else:
    mintab.pop(0)
    #print "pop min"
    del dump

    return maxtab, mintab



    def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49):
    """
    Algorithm for detecting local maximas and minmias in a signal.
    Discovers peaks by dividing the signal into bins and retrieving the
    maximum and minimum value of each the even and odd bins respectively.
    Division into bins is performed by smoothing the curve and finding the
    zero crossings.
    keyword arguments:
    y_axis -- A list containg the signal over which to find peaks
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the peaks. If omitted the index
    of the y_axis is used. (default: None)
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 49)
    return -- two lists [maxtab, mintab] containing the positive and negative
    peaks respectively. Each cell of the lists contains a tupple of:
    (position, peak_value)
    to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results
    """
    if x_axis is None:
    x_axis = range(len(y_axis))

    length = len(y_axis)
    if length != len(x_axis):
    raise ValueError, 'Input vectors y_axis and x_axis must have same length'

    #needs to be a numpy array
    y_axis = np.asarray(y_axis)

    zero_indices = zero_crossings(y_axis, window = window)
    period_lengths = np.diff(zero_indices)

    bins = [y_axis[indice:indice+diff] for indice, diff in
    zip(zero_indices, period_lengths)]

    even_bins = bins[::2]
    odd_bins = bins[1::2]
    #check if even bin contains maxima
    if even_bins[0].max() > abs(even_bins[0].min()):
    hi_peaks = [bin.max() for bin in even_bins]
    lo_peaks = [bin.min() for bin in odd_bins]
    else:
    hi_peaks = [bin.max() for bin in odd_bins]
    lo_peaks = [bin.min() for bin in even_bins]


    hi_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in hi_peaks]
    lo_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in lo_peaks]

    maxtab = [(x,y) for x,y in zip(hi_peaks, hi_peaks_x)]
    mintab = [(x,y) for x,y in zip(lo_peaks, lo_peaks_x)]

    return maxtab, mintab



    def smooth(x,window_len=11,window='hanning'):
    """
    smooth the data using a window with requested size.
    This method is based on the convolution of a scaled window with the signal.
    The signal is prepared by introducing reflected copies of the signal
    (with the window size) in both ends so that transient parts are minimized
    in the begining and end part of the output signal.
    input:
    x: the input signal
    window_len: the dimension of the smoothing window; should be an odd integer
    window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
    flat window will produce a moving average smoothing.
    output:
    the smoothed signal
    example:
    t=linspace(-2,2,0.1)
    x=sin(t)+randn(len(t))*0.1
    y=smooth(x)
    see also:
    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
    scipy.signal.lfilter
    TODO: the window parameter could be the window itself if an array instead of a string
    """
    if x.ndim != 1:
    raise ValueError, "smooth only accepts 1 dimension arrays."

    if x.size < window_len:
    raise ValueError, "Input vector needs to be bigger than window size."


    if window_len<3:
    return x


    if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
    raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"


    s=np.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]]
    #print(len(s))
    if window == 'flat': #moving average
    w=np.ones(window_len,'d')
    else:
    w=eval('np.'+window+'(window_len)')

    y=np.convolve(w/w.sum(),s,mode='valid')
    return y


    def zero_crossings(y_axis, x_axis = None, window = 49):
    """
    Algorithm to find zero crossings. Smoothens the curve and finds the
    zero-crossings by looking for a sign change.
    keyword arguments:
    y_axis -- A list containg the signal over which to find zero-crossings
    x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used
    in the return to specify the postion of the zero-crossings. If omitted
    then the indice of the y_axis is used. (default: None)
    window -- the dimension of the smoothing window; should be an odd integer
    (default: 49)
    return -- the x_axis value or the indice for each zero-crossing
    """
    #smooth the curve
    length = len(y_axis)
    if x_axis == None:
    x_axis = range(length)

    x_axis = np.asarray(x_axis)

    y_axis = smooth(y_axis, window)[:length]
    zero_crossings = np.where(np.diff(np.sign(y_axis)))[0]
    times = [x_axis[indice] for indice in zero_crossings]

    #check if zero-crossings are valid
    diff = np.diff(times)
    if diff.std() / diff.mean() > 0.1:
    raise ValueError, "smoothing window too small, false zero-crossings found"

    return times


    if __name__=="__main__":
    series = [0,0,0,2,0,0,0,-2,0,0,0,2,0,0,0,-2,0]
    print peakdet(series,1)
    import pylab
    from math import pi

    i = 10000
    x = np.linspace(0,3.7*pi,i)
    y = 0.3*np.sin(x) + np.sin(1.3*x) + 0.9*np.sin(4.2*x) + 0.06*np.random.randn(i)
    y *= -1
    x = range(i)

    _max, _min = peakdetect(y,x,750, 0.30)
    xm = [p[0] for p in _max]
    ym = [p[1] for p in _max]
    xn = [p[0] for p in _min]
    yn = [p[1] for p in _min]

    plot = pylab.plot(x,y)
    pylab.hold(True)
    pylab.plot(xm, ym, 'r+')
    pylab.plot(xn, yn, 'g+')
  11. @endolith endolith revised this gist Nov 30, 2010. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -71,3 +71,7 @@ def peakdet(v, delta, x = None):
    lookformax = True

    return maxtab, mintab

    if __name__=="__main__":
    series = [0,0,0,2,0,0,0,-2,0,0,0,2,0,0,0,-2,0]
    print peakdet(series,1)
  12. @endolith endolith revised this gist Nov 30, 2010. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    import sys
    from numpy import NaN, Inf, arange, isscalar
    from numpy import NaN, Inf, arange, isscalar, asarray

    def peakdet(v, delta, x = None):
    """
  13. @endolith endolith created this gist Dec 7, 2009.
    73 changes: 73 additions & 0 deletions peakdetect.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    import sys
    from numpy import NaN, Inf, arange, isscalar

    def peakdet(v, delta, x = None):
    """
    Converted from MATLAB script at http://billauer.co.il/peakdet.html
    Currently returns two lists of tuples, but maybe arrays would be better
    function [maxtab, mintab]=peakdet(v, delta, x)
    %PEAKDET Detect peaks in a vector
    % [MAXTAB, MINTAB] = PEAKDET(V, DELTA) finds the local
    % maxima and minima ("peaks") in the vector V.
    % MAXTAB and MINTAB consists of two columns. Column 1
    % contains indices in V, and column 2 the found values.
    %
    % With [MAXTAB, MINTAB] = PEAKDET(V, DELTA, X) the indices
    % in MAXTAB and MINTAB are replaced with the corresponding
    % X-values.
    %
    % A point is considered a maximum peak if it has the maximal
    % value, and was preceded (to the left) by a value lower by
    % DELTA.
    % Eli Billauer, 3.4.05 (Explicitly not copyrighted).
    % This function is released to the public domain; Any use is allowed.
    """
    maxtab = []
    mintab = []

    if x is None:
    x = arange(len(v))

    v = asarray(v)

    if len(v) != len(x):
    sys.exit('Input vectors v and x must have same length')

    if not isscalar(delta):
    sys.exit('Input argument delta must be a scalar')

    if delta <= 0:
    sys.exit('Input argument delta must be positive')

    mn, mx = Inf, -Inf
    mnpos, mxpos = NaN, NaN

    lookformax = True

    for i in arange(len(v)):
    this = v[i]
    if this > mx:
    mx = this
    mxpos = x[i]
    if this < mn:
    mn = this
    mnpos = x[i]

    if lookformax:
    if this < mx-delta:
    maxtab.append((mxpos, mx))
    mn = this
    mnpos = x[i]
    lookformax = False
    else:
    if this > mn+delta:
    mintab.append((mnpos, mn))
    mx = this
    mxpos = x[i]
    lookformax = True

    return maxtab, mintab