-
Star
(289)
You must be signed in to star a gist -
Fork
(130)
You must be signed in to fork a gist
-
-
Save sixtenbe/1178136 to your computer and use it in GitHub Desktop.
Revisions
-
sixtenbe revised this gist
May 11, 2016 . 3 changed files with 242 additions and 62 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -83,8 +83,8 @@ def _peakdetect_parabola_fitter(raw_peaks, x_axis, y_axis, points): Performs the actual parabola fitting for the peakdetect_parabola function. keyword arguments: raw_peaks -- A list of either the maxima or the minima peaks, as given by the peakdetect functions, with index used as x-axis x_axis -- A numpy array of all the x values @@ -304,12 +304,14 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 20): zero_indices = zero_crossings(y_axis, window_len = 11) #select a n amount of periods last_indice = - 1 - (1 - len(zero_indices) & 1) ### # Calculate the fft between the first and last zero crossing # this method could be ignored if the beginning and the end of the signal # are unnecessary as any errors induced from not using whole periods # should mainly manifest in the beginning and the end of the signal, but # not in the rest of the signal # this is also unnecessary if the given data is an amount of whole periods ### fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]]) padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:] n = lambda x: int(log(x)/log(2)) + 1 @@ -335,20 +337,6 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 20): data_len += 1 - data_len & 1 return [max_peaks, min_peaks] @@ -732,7 +720,8 @@ def _smooth(x, window_len=11, window="hanning"): return y def zero_crossings(y_axis, window_len = 11, window_f="hanning", offset_corrected=False): """ Algorithm to find zero crossings. Smooths the curve and finds the zero-crossings by looking for a sign change. @@ -747,31 +736,39 @@ def zero_crossings(y_axis, window_len = 11, window_f="hanning"): window_f -- the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' (default: 'hanning') offset_corrected -- Used for recursive calling to remove offset when needed return: the index for each zero-crossing """ # smooth the curve length = len(y_axis) # discard tail of smoothed signal y_axis = _smooth(y_axis, window_len, window_f)[:length] indices = np.where(np.diff(np.sign(y_axis)))[0] # check if zero-crossings are valid diff = np.diff(indices) if diff.std() / diff.mean() > 0.1: #Possibly bad zero crossing, see if it's offsets if ((diff[::2].std() / diff[::2].mean()) < 0.1 and (diff[1::2].std() / diff[1::2].mean()) < 0.1 and not offset_corrected): #offset present attempt to correct by subtracting the average offset = np.mean([y_axis.max(), y_axis.min()]) return zero_crossings(y_axis-offset, window_len, window_f, True) #Invalid zero crossings and the offset has been removed print(diff.std() / diff.mean()) print(np.diff(indices)) raise ValueError( "False zero-crossings found, indicates problem {0!s} or {1!s}".format( "with smoothing window", "unhandled problem with offset")) # check if any zero crossings were found if len(indices) < 1: raise ValueError("No zero crossings found") #remove offset from indices due to filter function when returning return indices - (window_len // 2 - 1) # used this to test the fft function's sensitivity to spectral leakage #return indices + np.asarray(30 * np.random.randn(len(indices)), int) @@ -780,8 +777,8 @@ def zero_crossings(y_axis, window_len = 11, window_f="hanning"): # time_p_period = diff.mean() # # if diff.std() / time_p_period > 0.1: # raise ValueError( # "smoothing window too small, false zero-crossing found") # # #return frequency # return 1.0 / time_p_period This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,8 @@ Musings about the peakdetect functions by Sixten Bergman Note that this code should work with both python 2.7 and python3.x. All the peak detection functions in __all__ of peakdetect.py will work on consistent waveforms, but only peakdetect.peakdetect can properly handle offsets. @@ -22,24 +23,31 @@ this as far as I can tell is that the scipy.optimize.curve_fit can't optimize the variables. For parabola fit to function well, it must be fitted to a small section of the peak as the curvature will start to mismatch with the function, but this also means that the parabola should be quite sensitive to noise FFT interpolation has between 0 to 2 orders of magnitude improvement over a raw peak fit. To obtain this improvement the wave needs to be heavily padded in length Spline seems to have similar performance to a FFT interpolation of the time domain. Spline does however seem to be better at estimating amplitude than the FFT method, but is unknown if this will hold true for wave-shapes that are noisy. It should also be noted that the errors as given in "Missmatch data.txt" generated by the test routine are for pure functions with no noise, so the only error being reduced by the "non-raw" peakdetect functions are errors stemming low time resolution and are in no way an indication of how the functions can handle any kind of noise that real signals will have. Automatic tests for sine fitted peak detection is disabled due to it's problems Avoid using the following functions as they're questionable in performance: peakdetect_sine peakdetect_sine_locked This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -21,14 +21,22 @@ linspace_standard = np.linspace(0, 0.10, 1000) linspace_peakdetect = np.linspace(0, 0.10, 10000) def prng(): """ A numpy random number generator with a known starting state return: a random number generator """ return np.random.RandomState(773889874) def _write_log(file, header, message): with open(file, "ab") as f: f.write(header) f.write("\n") f.writelines(message) f.write("\n") f.write("\n") def _calculate_missmatch(received, expected): @@ -59,7 +67,17 @@ def _log_diff(t_max, y_max, file, name ): """ keyword arguments: t_max -- time of maxima y_max -- amplitude of maxima t_min -- time of minima y_min -- amplitude of maxima t_max_expected -- expected time of maxima y_max_expected -- expected amplitude of maxima t_min_expected -- expected time of minima y_min_expected -- expected amplitude of maxima file -- log file to write to name -- name of the test performed """ t_diff_h, a_diff_h = _calculate_missmatch([t_max, y_max], [t_max_expected, y_max_expected]) @@ -68,14 +86,18 @@ def _log_diff(t_max, y_max, t_diff_l, a_diff_l = _calculate_missmatch([t_min, y_min], [t_min_expected, y_min_expected]) #data = ["\t{0:.2e}\t{1:.2e}\t{2:.2e}\t{3:.2e}".format(*d) for d in # [t_diff_h, t_diff_l, a_diff_h, a_diff_l] # ] frt = "val:{0} error:{1:.2e}" data = ["\t{0}".format("\t".join(map(frt.format, val, err))) for val, err in [(t_max, t_diff_h), (t_min, t_diff_l), (y_max, a_diff_h), (y_min, a_diff_l)] ] _write_log(file, name, "\n".join(data)) @@ -93,11 +115,17 @@ def _is_close(max_p, min_p, expected_min -- expected location and value of minima atol_time -- absolute tolerance of location of vertex tol_ampl -- relative tolerance of value of vertex file -- log file to write to name -- name of the test performed """ if len(max_p) == 5: t_max_expected, y_max_expected = zip(*expected_max) else: if abs(max_p[0][0] - expected_max[0][0]) > 0.001: t_max_expected, y_max_expected = zip(*expected_max[1:]) else: t_max_expected, y_max_expected = zip(*expected_max[:-1]) if len(min_p) == 5: t_min_expected, y_min_expected = zip(*expected_min) else: @@ -243,11 +271,11 @@ def test_peak_ACV1(self): peak_pos = 1000*np.sqrt(2) #1414.2135623730951 peak_neg = -peak_pos expected_max = [ (0.005, peak_pos), (0.025, peak_pos), (0.045, peak_pos), (0.065, peak_pos), (0.085, peak_pos) ] expected_min = [ (0.015, peak_neg), @@ -265,15 +293,15 @@ def test_peak_ACV1(self): atol_time, tol_ampl ) def test_peak_ACV2(self): peak_pos = 1000*np.sqrt(2) + 500 #1414.2135623730951 + 500 peak_neg = (-1000*np.sqrt(2)) + 500 #-914.2135623730951 expected_max = [ (0.005, peak_pos), (0.025, peak_pos), (0.045, peak_pos), (0.065, peak_pos), (0.085, peak_pos) ] expected_min = [ (0.015, peak_neg), @@ -283,7 +311,7 @@ def _test_peak_ACV2(self): (0.095, peak_neg) ] atol_time = 1e-5 tol_ampl = 2e-6 self._test_peak_template(analytic_wfm.ACV_A2, expected_max, expected_min, @@ -294,6 +322,8 @@ def _test_peak_ACV2(self): def test_peak_ACV3(self): """ Sine wave with a 3rd overtone WolframAlpha solution max{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~ @@ -374,6 +404,143 @@ def peak_neg(n): atol_time, tol_ampl ) def test_peak_ACV4(self): """ Sine wave with a 4th overtone Expected data is from a numerical solution using 1e8 samples The numerical solution used about 2 GB memory and required 64-bit python Test is currently disabled as it pushes time index forward enough to change what peaks are discovers by peakdetect_fft, such that the last maxima is lost instead of the first one, which is expected from all the other functions """ expected_max = [ (0.0059351920593519207, 1409.2119572886963), (0.025935191259351911, 1409.2119572887088), (0.045935191459351918, 1409.2119572887223), (0.065935191659351911, 1409.2119572887243), (0.085935191859351917, 1409.2119572887166) ] expected_min = [ (0.015935191159351911, -1409.2119572886984), (0.035935191359351915, -1409.2119572887166), (0.055935191559351914, -1409.2119572887245), (0.075935191759351914, -1409.2119572887223), (0.09593519195935192, -1409.2119572887068) ] atol_time = 1e-5 tol_ampl = 2.5e-6 #reduced tolerance since the expected values are only approximated self._test_peak_template(analytic_wfm.ACV_A4, expected_max, expected_min, "ACV4", atol_time, tol_ampl ) def test_peak_ACV5(self): """ Realistic triangle wave Easy enough to solve, but here is the numerical solution from 1e8 samples. Numerical solution used about 2 GB memory and required 64-bit python expected_max = [ [0.0050000000500000008, 1598.0613254815967] [0.025000000250000001, 1598.0613254815778], [0.045000000450000008, 1598.0613254815346], [0.064999999650000001, 1598.0613254815594], [0.084999999849999994, 1598.0613254815908] ] expected_min = [ [0.015000000150000001, -1598.0613254815908], [0.035000000350000005, -1598.0613254815594], [0.054999999549999998, -1598.0613254815346], [0.074999999750000004, -1598.0613254815778], [0.094999999949999997, -1598.0613254815967] ] """ peak_pos = 1130*np.sqrt(2) #1598.0613254815976 peak_neg = -1130*np.sqrt(2) #-1598.0613254815967 expected_max = [ (0.005, peak_pos), (0.025, peak_pos), (0.045, peak_pos), (0.065, peak_pos), (0.085, peak_pos) ] expected_min = [ (0.015, peak_neg), (0.035, peak_neg), (0.055, peak_neg), (0.075, peak_neg), (0.095, peak_neg) ] atol_time = 1e-5 tol_ampl = 4e-6 self._test_peak_template(analytic_wfm.ACV_A5, expected_max, expected_min, "ACV5", atol_time, tol_ampl ) def test_peak_ACV6(self): """ Realistic triangle wave Easy enough to solve, but here is the numerical solution from 1e8 samples. Numerical solution used about 2 GB memory and required 64-bit python expected_max = [ [0.0050000000500000008, 1485.6313472729362], [0.025000000250000001, 1485.6313472729255], [0.045000000450000008, 1485.6313472729012], [0.064999999650000001, 1485.6313472729153], [0.084999999849999994, 1485.6313472729323] ] expected_min = [ [0.015000000150000001, -1485.6313472729323], [0.035000000350000005, -1485.6313472729153], [0.054999999549999998, -1485.6313472729012], [0.074999999750000004, -1485.6313472729255], [0.094999999949999997, -1485.6313472729362] ] """ peak_pos = 1050.5*np.sqrt(2) #1485.6313472729364 peak_neg = -1050.5*np.sqrt(2) #1485.6313472729255 expected_max = [ (0.005, peak_pos), (0.025, peak_pos), (0.045, peak_pos), (0.065, peak_pos), (0.085, peak_pos) ] expected_min = [ (0.015, peak_neg), (0.035, peak_neg), (0.055, peak_neg), (0.075, peak_neg), (0.095, peak_neg) ] atol_time = 1e-5 tol_ampl = 2.5e-6 self._test_peak_template(analytic_wfm.ACV_A6, expected_max, expected_min, "ACV6", atol_time, tol_ampl ) class Test_peakdetect(_Test_peakdetect_template): @@ -440,6 +607,14 @@ def test__pad(self): def test__n(self): self.assertEqual(2**peakdetect._n(1000), 1024) def test_zero_crossings(self): y = analytic_wfm.ACV_A1(linspace_peakdetect) expected_indice = [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000] indice = peakdetect.zero_crossings(y, 50) msg = "index:{0:d} should be within 1 of expected:{1:d}" for rec, exp in zip(indice, expected_indice): self.assertAlmostEqual(rec, exp, delta=1, msg=msg.format(rec, exp)) @@ -449,9 +624,9 @@ def test__n(self): if __name__ == "__main__": tests_to_run = [ #Test_analytic_wfm, Test_peakdetect, Test_peakdetect_parabola, Test_peakdetect_fft, -
sixtenbe revised this gist
Apr 23, 2016 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -61,7 +61,7 @@ def ACV_A2(T, Hz=50): def ACV_A3(T, Hz=50): """ Generate a fundamental with a 3rd overtone keyword arguments: T -- time points to generate the waveform given in seconds @@ -76,7 +76,7 @@ def ACV_A3(T, Hz=50): def ACV_A4(T, Hz=50): """ Generate a fundamental with a 4th overtone keyword arguments: T -- time points to generate the waveform given in seconds -
sixtenbe revised this gist
Apr 23, 2016 . 4 changed files with 1142 additions and 182 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,203 @@ #!/usr/bin/python2 # Copyright (C) 2016 Sixten Bergman # License WTFPL # # This program is free software. It comes without any warranty, to the extent # permitted by applicable law. # You can redistribute it and/or modify it under the terms of the Do What The # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See # http://www.wtfpl.net/ for more details. # import numpy as np from math import pi, sqrt __all__ = [ 'ACV_A1', 'ACV_A2', 'ACV_A3', 'ACV_A4', 'ACV_A5', 'ACV_A6', 'ACV_A7', 'ACV_A8' ] #Heavyside step function H_num = lambda t: 1 if t > 0 else 0 H = lambda T: np.asarray([1 if t > 0 else 0 for t in T]) # pure sine def ACV_A1(T, Hz=50): """ Generate a pure sine wave at a specified frequency keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 T = np.asarray(T, dtype=np.float64) return ampl * sqrt(2) * np.sin(2*pi*Hz * T) def ACV_A2(T, Hz=50): """ Generate a pure sine wave with a DC offset at a specified frequency keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 offset = 500 T = np.asarray(T, dtype=np.float64) return ampl * sqrt(2) * np.sin(2*pi*Hz * T) + offset def ACV_A3(T, Hz=50): """ Generate a keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 T = np.asarray(T, dtype=np.float64) main_wave = np.sin(2*pi*Hz * T) harmonic_wave = 0.05 * np.sin(2*pi*Hz * T * 4 + pi * 2 / 3) return ampl * sqrt(2) * (main_wave + harmonic_wave) def ACV_A4(T, Hz=50): """ Generate a keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 T = np.asarray(T, dtype=np.float64) main_wave = np.sin(2*pi*Hz * T) harmonic_wave = 0.07 * np.sin(2*pi*Hz * T * 5 + pi * 22 / 18) return ampl * sqrt(2) * (main_wave + harmonic_wave) def ACV_A5(T, Hz=50): """ Generate a realistic triangle wave keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 T = np.asarray(T, dtype=np.float64) wave_1 = np.sin(2*pi*Hz * T) wave_2 = 0.05 * np.sin(2*pi*Hz * T * 3 - pi) wave_3 = 0.05 * np.sin(2*pi*Hz * T * 5) wave_4 = 0.02 * np.sin(2*pi*Hz * T * 7 - pi) wave_5 = 0.01 * np.sin(2*pi*Hz * T * 9) return ampl * sqrt(2) * (wave_1 + wave_2 + wave_3 + wave_4 + wave_5) def ACV_A6(T, Hz=50): """ Generate a realistic triangle wave keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 T = np.asarray(T, dtype=np.float64) wave_1 = np.sin(2*pi*Hz * T) wave_2 = 0.02 * np.sin(2*pi*Hz * T * 3 - pi) wave_3 = 0.02 * np.sin(2*pi*Hz * T * 5) wave_4 = 0.0015 * np.sin(2*pi*Hz * T * 7 - pi) wave_5 = 0.009 * np.sin(2*pi*Hz * T * 9) return ampl * sqrt(2) * (wave_1 + wave_2 + wave_3 + wave_4 + wave_5) def ACV_A7(T, Hz=50): """ Generate a growing sine wave, where the wave starts at 0 and reaches 0.9 of full amplitude at 250 cycles. Thereafter it will linearly increase to full amplitude at 500 cycles and terminate to 0 Frequency locked to 50Hz and = 0 at t>10 keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 Hz = 50 T = np.asarray(T, dtype=np.float64) wave_main = np.sin(2*pi*Hz * T) step_func = (0.9 * T / 5 * H(5-T) + H(T-5) * H(10-T) * (0.9 + 0.1 * (T-5) / 5)) return ampl * sqrt(2) * wave_main * step_func def ACV_A8(T, Hz=50): """ Generate a growing sine wave, which reaches 100 times the amplitude at 500 cycles frequency not implemented and signal = 0 at t>1000*pi signal frequency = 0.15915494309189535 Hz? keyword arguments: T -- time points to generate the waveform given in seconds Hz -- The desired frequency of the signal (default:50) """ ampl = 1000 Hz = 50 T = np.asarray(T, dtype=np.float64) wave_main = np.sin(T) step_func = T / (10 * pi) * H(10 - T / (2*pi*Hz)) return ampl * sqrt(2) * wave_main * step_func _ACV_A1_L = lambda T, Hz = 50: 1000 * sqrt(2) * np.sin(2*pi*Hz * T) # _ACV_A2_L = lambda T, Hz = 50: 1000 * sqrt(2) * np.sin(2*pi*Hz * T) + 500 # _ACV_A3_L = lambda T, Hz = 50: 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) + 0.05 * np.sin(2*pi*Hz * T * 4 + pi * 2 / 3)) # _ACV_A4_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) + 0.07 * np.sin(2*pi*Hz * T * 5 + pi * 22 / 18))) # Realistic triangle _ACV_A5_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) + 0.05 * np.sin(2*pi*Hz * T * 3 - pi) + 0.05 * np.sin(2*pi*Hz * T * 5) + 0.02 * np.sin(2*pi*Hz * T * 7 - pi) + 0.01 * np.sin(2*pi*Hz * T * 9))) # _ACV_A6_L = lambda T, Hz = 50:( 1000 * sqrt(2) * (np.sin(2*pi*Hz * T) + 0.02 * np.sin(2*pi*Hz * T * 3 - pi) + 0.02 * np.sin(2*pi*Hz * T * 5) + 0.0015 * np.sin(2*pi*Hz * T * 7 - pi) + 0.009 * np.sin(2*pi*Hz * T * 9))) #A7 & A8 convert so that a input of 16*pi corresponds to a input 0.25 in the current version _ACV_A7_OLD = lambda T: [1000 * sqrt(2) * np.sin(100 * pi * t) * (0.9 * t / 5 * H_num(5-t) + H_num(t-5) * H_num(10-t) * (0.9 + 0.1 * (t-5) / 5)) for t in T] _ACV_A8_OLD = lambda T: [1000 * sqrt(2) * np.sin(t) * t / (10 * pi) * H_num(10 - t / (100 * pi)) for t in T] if __name__ == "__main__": #create 1 period triangle x = np.linspace(0, 0.02, 4000) y = ACV_A5(x)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,60 +1,122 @@ #!/usr/bin/python2 # Copyright (C) 2016 Sixten Bergman # License WTFPL # # This program is free software. It comes without any warranty, to the extent # permitted by applicable law. # You can redistribute it and/or modify it under the terms of the Do What The # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See # http://www.wtfpl.net/ for more details. # # note that the function peakdetect is derived from code which was released to # public domain see: http://billauer.co.il/peakdet.html # import logging from math import pi, log import numpy as np import pylab from scipy import fft, ifft from scipy.optimize import curve_fit from scipy.signal import cspline1d_eval, cspline1d __all__ = [ "peakdetect", "peakdetect_fft", "peakdetect_parabola", "peakdetect_sine", "peakdetect_sine_locked", "peakdetect_spline", "peakdetect_zero_crossing", "zero_crossings", "zero_crossings_sine_fit" ] def _datacheck_peakdetect(x_axis, y_axis): if x_axis is None: x_axis = range(len(y_axis)) if len(y_axis) != len(x_axis): raise ValueError( "Input vectors y_axis and x_axis must have same length") #needs to be a numpy array y_axis = np.array(y_axis) x_axis = np.array(x_axis) return x_axis, y_axis def _pad(fft_data, pad_len): """ Pads fft data to interpolate in time domain keyword arguments: fft_data -- the fft pad_len -- By how many times the time resolution should be increased by return: padded list """ l = len(fft_data) n = _n(l * pad_len) fft_data = list(fft_data) return fft_data[:l // 2] + [0] * (2**n-l) + fft_data[l // 2:] def _n(x): """ Find the smallest value for n, which fulfils 2**n >= x keyword arguments: x -- the value, which 2**n must surpass return: the integer n """ return int(log(x)/log(2)) + 1 def _peakdetect_parabola_fitter(raw_peaks, x_axis, y_axis, points): """ Performs the actual parabola fitting for the peakdetect_parabola function. keyword arguments: raw_peaks -- A list of either the maximum or the minimum peaks, as given by the peakdetect_zero_crossing function, with index used as x-axis x_axis -- A numpy array of all the x values y_axis -- A numpy array of all the y values points -- How many points around the peak should be used during curve fitting, must be odd. return: A list giving all the peaks and the fitted waveform, format: [[x, y, [fitted_x, fitted_y]]] """ func = lambda x, a, tau, c: a * ((x - tau) ** 2) + c fitted_peaks = [] distance = abs(x_axis[raw_peaks[1][0]] - x_axis[raw_peaks[0][0]]) / 4 for peak in raw_peaks: index = peak[0] x_data = x_axis[index - points // 2: index + points // 2 + 1] y_data = y_axis[index - points // 2: index + points // 2 + 1] # get a first approximation of tau (peak position in time) tau = x_axis[index] # get a first approximation of peak amplitude c = peak[1] a = np.sign(c) * (-1) * (np.sqrt(abs(c))/distance)**2 """Derived from ABC formula to result in a solution where A=(rot(c)/t)**2""" # build list of approximations p0 = (a, tau, c) popt, pcov = curve_fit(func, x_data, y_data, p0) # retrieve tau and c i.e x and y value of peak x, y = popt[1:3] # create a high resolution data set for the fitted waveform @@ -66,37 +128,53 @@ def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points): return fitted_peaks def peakdetect_parabole(*args, **kwargs): """ Misspelling of peakdetect_parabola function is deprecated please use peakdetect_parabola """ logging.warn("peakdetect_parabole is deprecated due to misspelling use: peakdetect_parabola") return peakdetect_parabola(*args, **kwargs) def peakdetect(y_axis, x_axis = None, lookahead = 200, delta=0): """ Converted from/based on a MATLAB script at: http://billauer.co.il/peakdet.html function for detecting local maxima and minima in a signal. Discovers peaks by searching for values which are surrounded by lower or larger values for maxima and minima respectively keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. If omitted an index of the y_axis is used. (default: None) lookahead -- distance to look ahead from a peak candidate to determine if it is the actual peak (default: 200) '(samples / period) / f' where '4 >= f >= 1.25' might be a good value delta -- this specifies a minimum difference between a peak and the following points, before a peak may be considered a peak. Useful to hinder the function from picking up false peaks towards to end of the signal. To work well delta should be set to delta >= RMSnoise * 5. (default: 0) When omitted delta function causes a 20% decrease in speed. When used Correctly it can double the speed of the function return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ max_peaks = [] min_peaks = [] @@ -110,9 +188,9 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): #perform some checks if lookahead < 1: raise ValueError("Lookahead must be '1' or above in value") if not (np.isscalar(delta) and delta >= 0): raise ValueError("delta must be a positive number") #maxima and minima candidates are temporarily stored in #mx and mn respectively @@ -178,7 +256,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): return [max_peaks, min_peaks] def peakdetect_fft(y_axis, x_axis, pad_len = 20): """ Performs a FFT calculation on the data and zero-pads the results to increase the time domain resolution after performing the inverse fft and @@ -196,41 +274,46 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5): The biggest time eater in this function is the ifft and thereafter it's the 'peakdetect' function which takes only half the time of the ifft. Speed improvements could include to check if 2**n points could be used for fft and ifft or change the 'peakdetect' to the 'peakdetect_zero_crossing', which is maybe 10 times faster than 'peakdetct'. The pro of 'peakdetect' is that it results in one less lost peak. It should also be noted that the time used by the ifft function can change greatly depending on the input. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. pad_len -- By how many times the time resolution should be increased by, e.g. 1 doubles the resolution. The amount is rounded up to the nearest 2**n amount (default: 20) return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) zero_indices = zero_crossings(y_axis, window_len = 11) #select a n amount of periods last_indice = - 1 - (1 - len(zero_indices) & 1) # Calculate the fft between the first and last zero crossing # this method could be ignored if the beginning and the end of the signal # are unnecessary as any errors induced from not using whole periods # should mainly manifest in the beginning and the end of the signal, but # not in the rest of the signal # this is also unnecessary if the given data is a amount of whole periods fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]]) padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:] n = lambda x: int(log(x)/log(2)) + 1 # pads to 2**n amount of samples fft_padded = padd(list(fft_data), 2 ** n(len(fft_data) * pad_len) - len(fft_data)) @@ -266,40 +349,34 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5): peak_fit_tmp.append([x_fit_lim, y_fit_lim]) fitted_wave.append(peak_fit_tmp) return [max_peaks, min_peaks] def peakdetect_parabola(y_axis, x_axis, points = 31): """ Function for detecting local maxima and minima in a signal. Discovers peaks by fitting the model function: y = k (x - tau) ** 2 + m to the peaks. The amount of points used in the fitting is set by the points argument. Omitting the x_axis is forbidden as it would make the resulting x_axis value silly, if it was returned as index 50.234 or similar. will find the same amount of peaks as the 'peakdetect_zero_crossing' function, but might result in a more precise value of the peak. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. points -- How many points around the peak should be used during curve fitting (default: 31) return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: @@ -318,33 +395,22 @@ def peakdetect_parabole(y_axis, x_axis, points = 9): max_peaks = [] min_peaks = [] max_ = _peakdetect_parabola_fitter(max_raw, x_axis, y_axis, points) min_ = _peakdetect_parabola_fitter(min_raw, x_axis, y_axis, points) max_peaks = map(lambda x: [x[0], x[1]], max_) max_fitted = map(lambda x: x[-1], max_) min_peaks = map(lambda x: [x[0], x[1]], min_) min_fitted = map(lambda x: x[-1], min_) return [max_peaks, min_peaks] def peakdetect_sine(y_axis, x_axis, points = 31, lock_frequency = False): """ Function for detecting local maxima and minima in a signal. Discovers peaks by fitting the model function: y = A * sin(2 * pi * f * (x - tau)) to the peaks. The amount of points used in the fitting is set by the points argument. Omitting the x_axis is forbidden as it would make the resulting x_axis @@ -356,24 +422,29 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): The function might have some problems if the sine wave has a non-negligible total angle i.e. a k*x component, as this messes with the internal offset calculation of the peaks, might be fixed by fitting a y = k * x + m function to the peaks for offset calculation. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. points -- How many points around the peak should be used during curve fitting (default: 31) lock_frequency -- Specifies if the frequency argument of the model function should be locked to the value calculated from the raw peaks or if optimization process may tinker with it. (default: False) return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) @@ -393,23 +464,23 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): # fitting a k * x + m function to the peaks might be better #offset_func = lambda x, k, m: k * x + m # calculate an approximate frequency of the signal Hz_h_peak = np.diff(zip(*max_raw)[0]).mean() Hz_l_peak = np.diff(zip(*min_raw)[0]).mean() Hz = 1 / np.mean([Hz_h_peak, Hz_l_peak]) # model function # if cosine is used then tau could equal the x position of the peak # if sine were to be used then tau would be the first zero crossing if lock_frequency: func = lambda x_ax, A, tau: A * np.sin( 2 * pi * Hz * (x_ax - tau) + pi / 2) else: func = lambda x_ax, A, Hz, tau: A * np.sin( 2 * pi * Hz * (x_ax - tau) + pi / 2) #func = lambda x_ax, A, Hz, tau: A * np.cos(2 * pi * Hz * (x_ax - tau)) #get peaks @@ -431,7 +502,7 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): else: p0 = (A, Hz, tau) # subtract offset from wave-shape y_data -= offset popt, pcov = curve_fit(func, x_data, y_data, p0) # retrieve tau and A i.e x and y value of peak @@ -458,69 +529,108 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): min_fitted = map(lambda x: x[-1], fitted_peaks[1]) return [max_peaks, min_peaks] def peakdetect_sine_locked(y_axis, x_axis, points = 31): """ Convenience function for calling the 'peakdetect_sine' function with the lock_frequency argument as True. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. points -- How many points around the peak should be used during curve fitting (default: 31) return: see the function 'peakdetect_sine' """ return peakdetect_sine(y_axis, x_axis, points, True) def peakdetect_spline(y_axis, x_axis, pad_len=20): """ Performs a b-spline interpolation on the data to increase resolution and send the data to the 'peakdetect_zero_crossing' function for peak detection. Omitting the x_axis is forbidden as it would make the resulting x_axis value silly if it was returned as the index 50.234 or similar. will find the same amount of peaks as the 'peakdetect_zero_crossing' function, but might result in a more precise value of the peak. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. x-axis must be equally spaced. pad_len -- By how many times the time resolution should be increased by, e.g. 1 doubles the resolution. (default: 20) return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) # could perform a check if x_axis is equally spaced #if np.std(np.diff(x_axis)) > 1e-15: raise ValueError # perform spline interpolations dx = x_axis[1] - x_axis[0] x_interpolated = np.linspace(x_axis.min(), x_axis.max(), len(x_axis) * (pad_len + 1)) cj = cspline1d(y_axis) y_interpolated = cspline1d_eval(cj, x_interpolated, dx=dx,x0=x_axis[0]) # get peaks max_peaks, min_peaks = peakdetect_zero_crossing(y_interpolated, x_interpolated) return [max_peaks, min_peaks] def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11): """ Function for detecting local maxima and minima in a signal. Discovers peaks by dividing the signal into bins and retrieving the maximum and minimum value of each the even and odd bins respectively. Division into bins is performed by smoothing the curve and finding the zero crossings. Suitable for repeatable signals, where some noise is tolerated. Executes faster than 'peakdetect', although this function will break if the offset of the signal is too large. It should also be noted that the first and last peak will probably not be found, as this function only can find peaks between the first and last zero crossing. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. If omitted an index of the y_axis is used. (default: None) window -- the dimension of the smoothing window; should be an odd integer (default: 11) return: two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tuple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) zero_indices = zero_crossings(y_axis, window_len = window) period_lengths = np.diff(zero_indices) bins_y = [y_axis[index:index + diff] for index, diff in @@ -559,25 +669,27 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11): return [max_peaks, min_peaks] def _smooth(x, window_len=11, window="hanning"): """ smooth the data using a window of the requested size. This method is based on the convolution of a scaled window on the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the beginning and end part of the output signal. keyword arguments: x -- the input signal window_len -- the dimension of the smoothing window; should be an odd integer (default: 11) window -- the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman', where flat is a moving average (default: 'hanning') return: the smoothed signal example: @@ -588,55 +700,62 @@ def _smooth(x, window_len=11, window='hanning'): see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve, scipy.signal.lfilter """ if x.ndim != 1: raise ValueError("smooth only accepts 1 dimension arrays.") if x.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len<3: return x #declare valid windows in a dictionary window_funcs = { "flat": lambda _len: np.ones(_len, "d"), "hanning": np.hanning, "hamming": np.hamming, "bartlett": np.bartlett, "blackman": np.blackman } s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]] try: w = window_funcs[window](window_len) except KeyError: raise ValueError( "Window is not one of '{0}', '{1}', '{2}', '{3}', '{4}'".format( *window_funcs.keys())) y = np.convolve(w / w.sum(), s, mode = "valid") return y def zero_crossings(y_axis, window_len = 11, window_f="hanning"): """ Algorithm to find zero crossings. Smooths the curve and finds the zero-crossings by looking for a sign change. keyword arguments: y_axis -- A list containing the signal over which to find zero-crossings window_len -- the dimension of the smoothing window; should be an odd integer (default: 11) window_f -- the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' (default: 'hanning') return: the index for each zero-crossing """ # smooth the curve length = len(y_axis) x_axis = np.asarray(range(length), int) # discard tail of smoothed signal y_axis = _smooth(y_axis, window_len, window_f)[:length] zero_crossings = np.where(np.diff(np.sign(y_axis)))[0] indices = [x_axis[index] for index in zero_crossings] @@ -645,12 +764,12 @@ def zero_crossings(y_axis, window = 11): if diff.std() / diff.mean() > 0.2: print diff.std() / diff.mean() print np.diff(indices) raise ValueError( "False zero-crossings found, indicates problem {0!s} or {1!s}".format( "with smoothing window", "problem with offset")) # check if any zero crossings were found if len(zero_crossings) < 1: raise ValueError("No zero crossings found") return indices # used this to test the fft function's sensitivity to spectral leakage @@ -669,7 +788,114 @@ def zero_crossings(y_axis, window = 11): ############################################################################## def zero_crossings_sine_fit(y_axis, x_axis, fit_window = None, smooth_window = 11): """ Detects the zero crossings of a signal by fitting a sine model function around the zero crossings: y = A * sin(2 * pi * Hz * (x - tau)) + k * x + m Only tau (the zero crossing) is varied during fitting. Offset and a linear drift of offset is accounted for by fitting a linear function the negative respective positive raw peaks of the wave-shape and the amplitude is calculated using data from the offset calculation i.e. the 'm' constant from the negative peaks is subtracted from the positive one to obtain amplitude. Frequency is calculated using the mean time between raw peaks. Algorithm seems to be sensitive to first guess e.g. a large smooth_window will give an error in the results. keyword arguments: y_axis -- A list containing the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the position of the peaks. If omitted an index of the y_axis is used. (default: None) fit_window -- Number of points around the approximate zero crossing that should be used when fitting the sine wave. Must be small enough that no other zero crossing will be seen. If set to none then the mean distance between zero crossings will be used (default: None) smooth_window -- the dimension of the smoothing window; should be an odd integer (default: 11) return: A list containing the positions of all the zero crossings. """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) #get first guess zero_indices = zero_crossings(y_axis, window_len = smooth_window) #modify fit_window to show distance per direction if fit_window == None: fit_window = np.diff(zero_indices).mean() // 3 else: fit_window = fit_window // 2 #x_axis is a np array, use the indices to get a subset with zero crossings approx_crossings = x_axis[zero_indices] #get raw peaks for calculation of offsets and frequency raw_peaks = peakdetect_zero_crossing(y_axis, x_axis) #Use mean time between peaks for frequency ext = lambda x: list(zip(*x)[0]) _diff = map(np.diff, map(ext, raw_peaks)) Hz = 1 / np.mean(map(np.mean, _diff)) #Hz = 1 / np.diff(approx_crossings).mean() #probably bad precision #offset model function offset_func = lambda x, k, m: k * x + m k = [] m = [] amplitude = [] for peaks in raw_peaks: #get peak data as nparray x_data, y_data = map(np.asarray, zip(*peaks)) #x_data = np.asarray(x_data) #y_data = np.asarray(y_data) #calc first guess A = np.mean(y_data) p0 = (0, A) popt, pcov = curve_fit(offset_func, x_data, y_data, p0) #append results k.append(popt[0]) m.append(popt[1]) amplitude.append(abs(A)) #store offset constants p_offset = (np.mean(k), np.mean(m)) A = m[0] - m[1] #define model function to fit to zero crossing #y = A * sin(2*pi * Hz * (x - tau)) + k * x + m func = lambda x, tau: A * np.sin(2 * pi * Hz * (x - tau)) + offset_func(x, *p_offset) #get true crossings true_crossings = [] for indice, crossing in zip(zero_indices, approx_crossings): p0 = (crossing, ) subset_start = max(indice - fit_window, 0.0) subset_end = min(indice + fit_window + 1, len(x_axis) - 1.0) x_subset = np.asarray(x_axis[subset_start:subset_end]) y_subset = np.asarray(y_axis[subset_start:subset_end]) #fit popt, pcov = curve_fit(func, x_subset, y_subset, p0) true_crossings.append(popt[0]) return true_crossings def _test_zero(): @@ -694,16 +920,35 @@ def _test_graph(): plot = pylab.plot(x,y) pylab.hold(True) pylab.plot(xm, ym, "r+") pylab.plot(xn, yn, "g+") _max, _min = peak_det_bad.peakdetect(y, 0.7, x) xm = [p[0] for p in _max] ym = [p[1] for p in _max] xn = [p[0] for p in _min] yn = [p[1] for p in _min] pylab.plot(xm, ym, "y*") pylab.plot(xn, yn, "k*") pylab.show() def _test_graph_cross(window = 11): i = 10000 x = np.linspace(0,8.7*pi,i) y = (2*np.sin(x) + 0.006 * np.random.randn(i)) y *= -1 pylab.plot(x,y) #pylab.show() crossings = zero_crossings_sine_fit(y,x, smooth_window = window) y_cross = [0] * len(crossings) plot = pylab.plot(x,y) pylab.hold(True) pylab.plot(crossings, y_cross, "b+") pylab.show() @@ -726,8 +971,8 @@ def _test_graph(): plot = pylab.plot(x, y) pylab.hold(True) pylab.plot(xm, ym, "r+") pylab.plot(xn, yn, "g+") pylab.show() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,45 @@ Musings about the peakdetect function by Sixten Bergman All the peak detection function in __all__ of peakdetect.py will work on consistent waveforms, but only peakdetect.peakdetect can properly handle offsets. The most accurate method for pure sine seems to be peakdetect_parabola, which for a 50Hz sine wave lasting 0.1s with 10k samples has an error in the order of 1e-10, whilst a naive most extreme sample will have an error in the order of 7e-5 for the position and 4e-7 for the amplitude Do note that this accuracy most likely doesn't stay true for any real world data where you'll have noise and harmonics in the signal which may produce errors in the functions, which may be smaller or larger then the error of naively using the highest/lowest point in a local maxima/minima. The sine fit function seem to perform even worse than a just retrieving the highest or lowest data point and is as such not recommended. The reason for this as far as I can tell is that the scipy.optimize.curve_fit can't optimize the variables. For parabola fit to function well it must be fitted to a small section of the peak as the curvature will start to mismatch with the function, but this also means that parabola should be quite sensitive to noise FFT interpolation has between 0 to 2 orders of magnitude improvement over a raw peak fit. To obtain this improvement the wave needs to be heavily padded in length Spline seems to have similar performance to a FFT interpolation of the time domain. Spline does however seem to be better at estimating amplitude than the FFT method, but is unknown if this will hold true for wave-shapes that are noisy. Automatic tests for sine fitted peak detection is disabled due to it's problems Avoid using the following function as they're questionable in performance: peakdetect_sine peakdetect_sine_locked This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,467 @@ #!/usr/bin/python2 # Copyright (C) 2016 Sixten Bergman # License WTFPL # # This program is free software. It comes without any warranty, to the extent # permitted by applicable law. # You can redistribute it and/or modify it under the terms of the Do What The # Fuck You Want To Public License, Version 2, as published by Sam Hocevar. See # http://www.wtfpl.net/ for more details. # import analytic_wfm import numpy as np import peakdetect import unittest import pdb #generate time axis for 5 cycles @ 50 Hz linspace_standard = np.linspace(0, 0.10, 1000) linspace_peakdetect = np.linspace(0, 0.10, 10000) def _write_log(file, header, message): with open(file, 'ab') as f: f.write(header) f.write('\n') f.writelines(message) f.write('\n') f.write('\n') def _calculate_missmatch(received, expected): """ Calculates the mean mismatch between received and expected data keyword arguments: received -- [[time of peak], [ampl of peak]] expected -- [[time of peak], [ampl of peak]] return (time mismatch, ampl mismatch) """ #t_diff = np.abs(np.asarray(received[0]) - expected[0]) t_diff = np.asarray(received[0]) - expected[0] a_diff = np.abs(np.asarray(received[1]) - expected[1]) #t_diff /= np.abs(expected[0]) time error in absolute terms a_diff /= np.abs(expected[1]) return (t_diff, a_diff) def _log_diff(t_max, y_max, t_min, y_min, t_max_expected, y_max_expected, t_min_expected, y_min_expected, file, name ): """ """ t_diff_h, a_diff_h = _calculate_missmatch([t_max, y_max], [t_max_expected, y_max_expected]) t_diff_l, a_diff_l = _calculate_missmatch([t_min, y_min], [t_min_expected, y_min_expected]) #data = ['\t{0:.2e}\t{1:.2e}\t{2:.2e}\t{3:.2e}'.format(*d) for d in # [t_diff_h, t_diff_l, a_diff_h, a_diff_l] # ] data = ['\t{0}'.format('\t'.join(map('{0:.2e}'.format, d))) for d in [t_diff_h, t_diff_l, a_diff_h, a_diff_l] ] _write_log(file, name, '\n'.join(data)) def _is_close(max_p, min_p, expected_max, expected_min, atol_time, tol_ampl, file, name): """ Determines if the peaks are within the given tolerance keyword arguments: max_p -- location and value of maxima min_p -- location and value of minima expected_max -- expected location and value of maxima expected_min -- expected location and value of minima atol_time -- absolute tolerance of location of vertex tol_ampl -- relative tolerance of value of vertex """ if len(max_p) == 5: t_max_expected, y_max_expected = zip(*expected_max) else: t_max_expected, y_max_expected = zip(*expected_max[1:]) if len(min_p) == 5: t_min_expected, y_min_expected = zip(*expected_min) else: t_min_expected, y_min_expected = zip(*expected_min[:-1]) t_max, y_max = zip(*max_p) t_min, y_min = zip(*min_p) t_max_close = np.isclose(t_max, t_max_expected, atol=atol_time, rtol=1e-12) y_max_close = np.isclose(y_max, y_max_expected, tol_ampl) t_min_close = np.isclose(t_min, t_min_expected, atol=atol_time, rtol=1e-12) y_min_close = np.isclose(y_min, y_min_expected, tol_ampl) _log_diff(t_max, y_max, t_min, y_min, t_max_expected, y_max_expected, t_min_expected, y_min_expected, file, name) return(t_max_close, y_max_close, t_min_close, y_min_close) class Test_analytic_wfm(unittest.TestCase): def test_ACV1(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A1_L(linspace_standard) acv = analytic_wfm.ACV_A1(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV2(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A2_L(linspace_standard) acv = analytic_wfm.ACV_A2(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV3(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A3_L(linspace_standard) acv = analytic_wfm.ACV_A3(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV4(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A4_L(linspace_standard) acv = analytic_wfm.ACV_A4(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV5(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A5_L(linspace_standard) acv = analytic_wfm.ACV_A5(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV6(self): #compare with previous lambda implementation old = analytic_wfm._ACV_A6_L(linspace_standard) acv = analytic_wfm.ACV_A6(linspace_standard) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV7(self): num = np.linspace(0, 20, 1000) old = analytic_wfm._ACV_A7_OLD(num) acv = analytic_wfm.ACV_A7(num) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) def test_ACV8(self): num = np.linspace(0, 3150, 10000) old = analytic_wfm._ACV_A8_OLD(num) acv = analytic_wfm.ACV_A8(num) self.assertTrue(np.allclose(acv, old, rtol=1e-9)) class _Test_peakdetect_template(unittest.TestCase): func = None file = "Mismatch data.txt" name = "template" args = [] kwargs = {} msg_t = "Time of {0!s} not within tolerance:\n\t{1}" msg_y = "Amplitude of {0!s} not within tolerance:\n\t{1}" def _test_peak_template(self, waveform, expected_max, expected_min, wav_name, atol_time = 1e-5, tol_ampl = 1e-5): """ keyword arguments: waveform -- a function that given x can generate a test waveform expected_max -- position and amplitude where maxima are expected expected_min -- position and amplitude where minima are expected wav_name -- Name of the test waveform atol_time -- absolute tolerance for position of vertex (default: 1e-5) tol_ampl -- relative tolerance for position of vertex (default: 1e-5) """ y = waveform(linspace_peakdetect) max_p, min_p = self.func(y, linspace_peakdetect, *self.args, **self.kwargs ) #check if the correct amount of peaks were discovered self.assertIn(len(max_p), [4,5]) self.assertIn(len(min_p), [4,5]) # # check if position and amplitude is within 0.001% which is approx the # numeric uncertainty from the amount of samples used # t_max_close, y_max_close, t_min_close, y_min_close = _is_close(max_p, min_p, expected_max, expected_min, atol_time, tol_ampl, self.file, "{0}: {1}".format(wav_name, self.name)) #assert if values are outside of tolerance self.assertTrue(np.all(t_max_close), msg=self.msg_t.format("maxima", t_max_close)) self.assertTrue(np.all(y_max_close), msg=self.msg_y.format("maxima", y_max_close)) self.assertTrue(np.all(t_min_close), msg=self.msg_t.format("minima", t_min_close)) self.assertTrue(np.all(y_min_close), msg=self.msg_y.format("minima", y_min_close)) def test_peak_ACV1(self): peak_pos = 1000*np.sqrt(2) #1414.2135623730951 peak_neg = -peak_pos expected_max = [ [0.005, peak_pos], [0.025, peak_pos], [0.045, peak_pos], [0.065, peak_pos], [0.085, peak_pos] ] expected_min = [ (0.015, peak_neg), (0.035, peak_neg), (0.055, peak_neg), (0.075, peak_neg), (0.095, peak_neg) ] atol_time = 1e-5 tol_ampl = 1e-6 self._test_peak_template(analytic_wfm.ACV_A1, expected_max, expected_min, "ACV1", atol_time, tol_ampl ) def _test_peak_ACV2(self): peak_pos = 1000*np.sqrt(2) + 500 #1414.2135623730951 + 500 peak_neg = (-1000*np.sqrt(2)) + 500 #-914.2135623730951 expected_max = [ [0.005, peak_pos], [0.025, peak_pos], [0.045, peak_pos], [0.065, peak_pos], [0.085, peak_pos] ] expected_min = [ (0.015, peak_neg), (0.035, peak_neg), (0.055, peak_neg), (0.075, peak_neg), (0.095, peak_neg) ] atol_time = 1e-5 tol_ampl = 1e-6 self._test_peak_template(analytic_wfm.ACV_A2, expected_max, expected_min, "ACV2", atol_time, tol_ampl ) def test_peak_ACV3(self): """ WolframAlpha solution max{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~ sin(6.28319 n+1.51306)-0.05 sin(25.1327 n+5.00505) at x~~0.00481623+0.02 n for integer n min{y = sin(100 pi x)+0.05 sin(400 pi x+(2 pi)/3)}~~ 0.05 sin(6.55488-25.1327 n)-sin(1.37692-6.28319 n) at x~~-0.00438287+0.02 n for integer n Derivative for 50 Hz in 2 alternative forms y = 100pi*cos(100pi*x) - 25pi*cos(400pi*x)-0.3464*50*pi*sin(400pi*x) y = 100pi*cos(100pi*x) + 20pi*cos(400pi*x + 2*pi/3) root 0 = 1/(50 * pi) * (pi*0 - 0.68846026579266880983) The exact solution according to WolframAlpha - I haven't the foggiest (tan^(-1)(root of {#1^2-3&, 11 #2^8-8 #1 #2^7-8 #2^6+56 #1 #2^5+70 #2^4-56 #1 #2^3-48 #2^2+8 #1 #2-9&}(x) near x = -0.822751)+pi n) / (50 * pi) root 1 = 1/(50 * pi) * (pi*0 + 0.75653155241276430710) period = 0.02 """ base = 1000*np.sqrt(2) #def peak_pos(n): # return base * (np.sin(6.28319 * n + 1.51306) # -0.05*np.sin(25.1327 * n + 5.00505)) #def peak_neg(n): # return base * (0.05 * np.sin(6.55488 - 25.1327 * n) # - np.sin(1.37692 - 6.28319 * n)) def peak_pos(n): return base * (np.sin(2*np.pi * n + 1.51306) -0.05*np.sin(8*np.pi * n + 5.00505)) def peak_neg(n): return base * (0.05 * np.sin(6.55488 - 8*np.pi * n) - np.sin(1.37692 - 2*np.pi * n)) t_max = [ 0.75653155241276430710/(50*np.pi)+0.00,#0.004816229446859069 0.75653155241276430710/(50*np.pi)+0.02,#0.024816229446859069 0.75653155241276430710/(50*np.pi)+0.04,#0.044816229446859069 0.75653155241276430710/(50*np.pi)+0.06,#0.064816229446859069 0.75653155241276430710/(50*np.pi)+0.08 #0.084816229446859069 ] t_min = [ -0.68846026579266880983/(50*np.pi)+0.02,#0.015617125823069466 -0.68846026579266880983/(50*np.pi)+0.04,#0.035617125823069466 -0.68846026579266880983/(50*np.pi)+0.06,#0.055617125823069466 -0.68846026579266880983/(50*np.pi)+0.08,#0.075617125823069466 -0.68846026579266880983/(50*np.pi)+0.10 #0.095617125823069466 ] expected_max = [ (t_max[0], analytic_wfm.ACV_A3(t_max[0])), (t_max[1], analytic_wfm.ACV_A3(t_max[1])), (t_max[2], analytic_wfm.ACV_A3(t_max[2])), (t_max[3], analytic_wfm.ACV_A3(t_max[3])), (t_max[4], analytic_wfm.ACV_A3(t_max[4])), ] expected_min = [ (t_min[0], analytic_wfm.ACV_A3(t_min[0])), (t_min[1], analytic_wfm.ACV_A3(t_min[1])), (t_min[2], analytic_wfm.ACV_A3(t_min[2])), (t_min[3], analytic_wfm.ACV_A3(t_min[3])), (t_min[4], analytic_wfm.ACV_A3(t_min[4])), ] atol_time = 1e-5 tol_ampl = 2e-6 #reduced tolerance since the expected values are only approximated self._test_peak_template(analytic_wfm.ACV_A3, expected_max, expected_min, "ACV3", atol_time, tol_ampl ) class Test_peakdetect(_Test_peakdetect_template): name = "peakdetect" def __init__(self, *args, **kwargs): super(Test_peakdetect, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect class Test_peakdetect_fft(_Test_peakdetect_template): name = "peakdetect_fft" def __init__(self, *args, **kwargs): super(Test_peakdetect_fft, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_fft class Test_peakdetect_parabola(_Test_peakdetect_template): name = "peakdetect_parabola" def __init__(self, *args, **kwargs): super(Test_peakdetect_parabola, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_parabola class Test_peakdetect_sine(_Test_peakdetect_template): name = "peakdetect_sine" def __init__(self, *args, **kwargs): super(Test_peakdetect_sine, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_sine class Test_peakdetect_sine_locked(_Test_peakdetect_template): name = "peakdetect_sine_locked" def __init__(self, *args, **kwargs): super(Test_peakdetect_sine_locked, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_sine_locked class Test_peakdetect_spline(_Test_peakdetect_template): name = "peakdetect_spline" def __init__(self, *args, **kwargs): super(Test_peakdetect_spline, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_spline class Test_peakdetect_zero_crossing(_Test_peakdetect_template): name = "peakdetect_zero_crossing" def __init__(self, *args, **kwargs): super(Test_peakdetect_zero_crossing, self).__init__(*args, **kwargs) self.func = peakdetect.peakdetect_zero_crossing class Test_peakdetect_misc(unittest.TestCase): def test__pad(self): data = [1,2,3,4,5,6,5,4,3,2,1] pad_len = 2 pad = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:] expected = pad(list(data), 2 ** peakdetect._n(len(data) * pad_len) - len(data)) received = peakdetect._pad(data, pad_len) self.assertListEqual(received, expected) def test__n(self): self.assertEqual(2**peakdetect._n(1000), 1024) #class zero_crossings(unittest.TestCase): if __name__ == '__main__': tests_to_run = [ Test_analytic_wfm, Test_peakdetect, Test_peakdetect_parabola, Test_peakdetect_fft, #Test_peakdetect_sine, #sine tests disabled pending rework #Test_peakdetect_sine_locked, Test_peakdetect_spline, Test_peakdetect_zero_crossing, Test_peakdetect_misc ] suites_list = [unittest.TestLoader().loadTestsFromTestCase(test_class) for test_class in tests_to_run] big_suite = unittest.TestSuite(suites_list) unittest.TextTestRunner(verbosity=2).run(big_suite) -
sixtenbe revised this gist
Mar 1, 2012 . 1 changed file with 13 additions and 9 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -133,7 +133,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): #Maxima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].max() < mx: max_peaks.append([mxpos, mx]) dump.append(True) #set algorithm to only find minima now mx = np.Inf @@ -151,7 +151,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): #Minima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].min() > mn: min_peaks.append([mnpos, mn]) dump.append(False) #set algorithm to only find maxima now mn = -np.Inf @@ -175,7 +175,7 @@ def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): #no peaks were found, should the function return empty lists? pass return [max_peaks, min_peaks] def peakdetect_fft(y_axis, x_axis, pad_len = 5): @@ -234,7 +234,7 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5): fft_padded = padd(list(fft_data), 2 ** n(len(fft_data) * pad_len) - len(fft_data)) # There is amplitude decrease directly proportional to the sample increase sf = len(fft_padded) / float(len(fft_data)) # There might be a leakage giving the result an imaginary component # Return only the real component @@ -243,7 +243,8 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5): x_axis[zero_indices[0]], x_axis[zero_indices[last_indice]], len(y_axis_ifft)) # get the peaks to the interpolated waveform max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500, delta = abs(np.diff(y_axis).max() * 2)) #max_peaks, min_peaks = peakdetect_zero_crossing(y_axis_ifft, x_axis_ifft) # store one 20th of a period as waveform data @@ -257,8 +258,10 @@ def peakdetect_fft(y_axis, x_axis, pad_len = 5): index = 0 for peak in peaks: index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index x_fit_lim = x_axis_ifft[index - data_len // 2: index + data_len // 2 + 1] y_fit_lim = y_axis_ifft[index - data_len // 2: index + data_len // 2 + 1] peak_fit_tmp.append([x_fit_lim, y_fit_lim]) fitted_wave.append(peak_fit_tmp) @@ -404,7 +407,8 @@ def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): if lock_frequency: func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2) else: func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2) #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau)) @@ -552,7 +556,7 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11): max_peaks = [[x, y] for x,y in zip(hi_peaks_x, hi_peaks)] min_peaks = [[x, y] for x,y in zip(lo_peaks_x, lo_peaks)] return [max_peaks, min_peaks] def _smooth(x, window_len=11, window='hanning'): -
sixtenbe revised this gist
Mar 1, 2012 . 1 changed file with 579 additions and 129 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,59 +1,126 @@ import numpy as np from math import pi, log import pylab from scipy import fft, ifft from scipy.optimize import curve_fit i = 10000 x = np.linspace(0, 3.5 * pi, i) y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * np.random.randn(i)) def _datacheck_peakdetect(x_axis, y_axis): if x_axis is None: x_axis = range(len(y_axis)) if len(y_axis) != len(x_axis): raise (ValueError, 'Input vectors y_axis and x_axis must have same length') #needs to be a numpy array y_axis = np.array(y_axis) x_axis = np.array(x_axis) return x_axis, y_axis def _peakdetect_parabole_fitter(raw_peaks, x_axis, y_axis, points): """ Performs the actual parabole fitting for the peakdetect_parabole function. keyword arguments: raw_peaks -- A list of either the maximium or the minimum peaks, as given by the peakdetect_zero_crossing function, with index used as x-axis x_axis -- A numpy list of all the x values y_axis -- A numpy list of all the y values points -- How many points around the peak should be used during curve fitting, must be odd. return -- A list giving all the peaks and the fitted waveform, format: [[x, y, [fitted_x, fitted_y]]] """ func = lambda x, k, tau, m: k * ((x - tau) ** 2) + m fitted_peaks = [] for peak in raw_peaks: index = peak[0] x_data = x_axis[index - points // 2: index + points // 2 + 1] y_data = y_axis[index - points // 2: index + points // 2 + 1] # get a first approximation of tau (peak position in time) tau = x_axis[index] # get a first approximation of peak amplitude m = peak[1] # build list of approximations # k = -m as first approximation? p0 = (-m, tau, m) popt, pcov = curve_fit(func, x_data, y_data, p0) # retrieve tau and m i.e x and y value of peak x, y = popt[1:3] # create a high resolution data set for the fitted waveform x2 = np.linspace(x_data[0], x_data[-1], points * 10) y2 = func(x2, *popt) fitted_peaks.append([x, y, [x2, y2]]) return fitted_peaks def peakdetect(y_axis, x_axis = None, lookahead = 300, delta=0): """ Converted from/based on a MATLAB script at: http://billauer.co.il/peakdet.html function for detecting local maximas and minmias in a signal. Discovers peaks by searching for values which are surrounded by lower or larger values for maximas and minimas respectively keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- (optional) A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. If omitted an index of the y_axis is used. (default: None) lookahead -- (optional) distance to look ahead from a peak candidate to determine if it is the actual peak (default: 200) '(sample / period) / f' where '4 >= f >= 1.25' might be a good value delta -- (optional) this specifies a minimum difference between a peak and the following points, before a peak may be considered a peak. Useful to hinder the function from picking up false peaks towards to end of the signal. To work well delta should be set to delta >= RMSnoise * 5. (default: 0) delta function causes a 20% decrease in speed, when omitted Correctly used it can double the speed of the function return -- two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*tab) """ max_peaks = [] min_peaks = [] dump = [] #Used to pop the first hit which almost always is false # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) # store data length for later use length = len(y_axis) #perform some checks if lookahead < 1: raise ValueError, "Lookahead must be '1' or above in value" if not (np.isscalar(delta) and delta >= 0): raise ValueError, "delta must be a positive number" #maxima and minima candidates are temporarily stored in #mx and mn respectively mn, mx = np.Inf, -np.Inf #Only detect peak if there is 'lookahead' amount of points after it for index, (x, y) in enumerate(zip(x_axis[:-lookahead], y_axis[:-lookahead])): if y > mx: mx = y mxpos = x @@ -66,204 +133,552 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): #Maxima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].max() < mx: max_peaks.append((mxpos, mx)) dump.append(True) #set algorithm to only find minima now mx = np.Inf mn = np.Inf if index+lookahead >= length: #end is within lookahead no more peaks can be found break continue #else: #slows shit down this does # mx = ahead # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)] ####look for min#### if y > mn+delta and mn != -np.Inf: #Minima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].min() > mn: min_peaks.append((mnpos, mn)) dump.append(False) #set algorithm to only find maxima now mn = -np.Inf mx = -np.Inf if index+lookahead >= length: #end is within lookahead no more peaks can be found break #else: #slows shit down this does # mn = ahead # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)] #Remove the false hit on the first value of the y_axis try: if dump[0]: max_peaks.pop(0) else: min_peaks.pop(0) del dump except IndexError: #no peaks were found, should the function return empty lists? pass return max_peaks, min_peaks def peakdetect_fft(y_axis, x_axis, pad_len = 5): """ Performs a FFT calculation on the data and zero-pads the results to increase the time domain resolution after performing the inverse fft and send the data to the 'peakdetect' function for peak detection. Omitting the x_axis is forbidden as it would make the resulting x_axis value silly if it was returned as the index 50.234 or similar. Will find at least 1 less peak then the 'peakdetect_zero_crossing' function, but should result in a more precise value of the peak as resolution has been increased. Some peaks are lost in an attempt to minimize spectral leakage by calculating the fft between two zero crossings for n amount of signal periods. The biggest time eater in this function is the ifft and thereafter it's the 'peakdetect' function which takes only half the time of the ifft. Speed improvementd could include to check if 2**n points could be used for fft and ifft or change the 'peakdetect' to the 'peakdetect_zero_crossing', which is maybe 10 times faster than 'peakdetct'. The pro of 'peakdetect' is that it resutls in one less lost peak. It should also be noted that the time used by the ifft function can change greatly depending on the input. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. pad_len -- (optional) By how many times the time resolution should be increased by, e.g. 1 doubles the resolution. The amount is rounded up to the nearest 2 ** n amount (default: 5) return -- two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*tab) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) zero_indices = zero_crossings(y_axis, window = 11) #select a n amount of periods last_indice = - 1 - (1 - len(zero_indices) & 1) # Calculate the fft between the first and last zero crossing # this method could be ignored if the begining and the end of the signal # are discardable as any errors induced from not using whole periods # should mainly manifest in the beginning and the end of the signal, but # not in the rest of the signal fft_data = fft(y_axis[zero_indices[0]:zero_indices[last_indice]]) padd = lambda x, c: x[:len(x) // 2] + [0] * c + x[len(x) // 2:] n = lambda x: int(log(x)/log(2)) + 1 # padds to 2**n amount of samples fft_padded = padd(list(fft_data), 2 ** n(len(fft_data) * pad_len) - len(fft_data)) # amplitude of ifft is decreased, get sf to return to original amplitude sf = len(fft_padded) / float(len(fft_data)) # There might be a leakage giving the result an imaginary component # Return only the real component y_axis_ifft = ifft(fft_padded).real * sf #(pad_len + 1) x_axis_ifft = np.linspace( x_axis[zero_indices[0]], x_axis[zero_indices[last_indice]], len(y_axis_ifft)) # get the peaks to the interpolated waveform max_peaks, min_peaks = peakdetect(y_axis_ifft, x_axis_ifft, 500, delta = abs(np.diff(y_axis).max() * 2)) #max_peaks, min_peaks = peakdetect_zero_crossing(y_axis_ifft, x_axis_ifft) # store one 20th of a period as waveform data data_len = int(np.diff(zero_indices).mean()) / 10 data_len += 1 - data_len & 1 fitted_wave = [] for peaks in [max_peaks, min_peaks]: peak_fit_tmp = [] index = 0 for peak in peaks: index = np.where(x_axis_ifft[index:]==peak[0])[0][0] + index x_fit_lim = x_axis_ifft[index - data_len // 2: index + data_len // 2 + 1] y_fit_lim = y_axis_ifft[index - data_len // 2: index + data_len // 2 + 1] peak_fit_tmp.append([x_fit_lim, y_fit_lim]) fitted_wave.append(peak_fit_tmp) #pylab.plot(range(len(fft_data)), fft_data) #pylab.show() pylab.plot(x_axis, y_axis) pylab.hold(True) pylab.plot(x_axis_ifft, y_axis_ifft) #for max_p in max_peaks: # pylab.plot(max_p[0], max_p[1], 'xr') pylab.show() return [max_peaks, min_peaks] def peakdetect_parabole(y_axis, x_axis, points = 9): """ Function for detecting local maximas and minmias in a signal. Discovers peaks by fitting the model function: y = k (x - tau) ** 2 + m to the peaks. The amount of points used in the fitting is set by the points argument. Omitting the x_axis is forbidden as it would make the resulting x_axis value silly if it was returned as index 50.234 or similar. will find the same amount of peaks as the 'peakdetect_zero_crossing' function, but might result in a more precise value of the peak. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. points -- (optional) How many points around the peak should be used during curve fitting, must be odd (default: 9) return -- two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a list of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*max_peaks) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) # make the points argument odd points += 1 - points % 2 #points += 1 - int(points) & 1 slower when int conversion needed # get raw peaks max_raw, min_raw = peakdetect_zero_crossing(y_axis) # define output variable max_peaks = [] min_peaks = [] max_ = _peakdetect_parabole_fitter(max_raw, x_axis, y_axis, points) min_ = _peakdetect_parabole_fitter(min_raw, x_axis, y_axis, points) max_peaks = map(lambda x: [x[0], x[1]], max_) max_fitted = map(lambda x: x[-1], max_) min_peaks = map(lambda x: [x[0], x[1]], min_) min_fitted = map(lambda x: x[-1], min_) #pylab.plot(x_axis, y_axis) #pylab.hold(True) #for max_p, max_f in zip(max_peaks, max_fitted): # pylab.plot(max_p[0], max_p[1], 'x') # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2) #for min_p, min_f in zip(min_peaks, min_fitted): # pylab.plot(min_p[0], min_p[1], 'x') # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2) #pylab.show() return [max_peaks, min_peaks] def peakdetect_sine(y_axis, x_axis, points = 9, lock_frequency = False): """ Function for detecting local maximas and minmias in a signal. Discovers peaks by fitting the model function: y = A * sin(2 * pi * f * x - tau) to the peaks. The amount of points used in the fitting is set by the points argument. Omitting the x_axis is forbidden as it would make the resulting x_axis value silly if it was returned as index 50.234 or similar. will find the same amount of peaks as the 'peakdetect_zero_crossing' function, but might result in a more precise value of the peak. The function might have some problems if the sine wave has a non-negligible total angle i.e. a k*x component, as this messes with the internal offset calculation of the peaks, might be fixed by fitting a k * x + m function to the peaks for offset calculation. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. points -- (optional) How many points around the peak should be used during curve fitting, must be odd (default: 9) lock_frequency -- (optional) Specifies if the frequency argument of the model function should be locked to the value calculated from the raw peaks or if optimization process may tinker with it. (default: False) return -- two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*tab) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) # make the points argument odd points += 1 - points % 2 #points += 1 - int(points) & 1 slower when int conversion needed # get raw peaks max_raw, min_raw = peakdetect_zero_crossing(y_axis) # define output variable max_peaks = [] min_peaks = [] # get global offset offset = np.mean([np.mean(max_raw, 0)[1], np.mean(min_raw, 0)[1]]) # fitting a k * x + m function to the peaks might be better #offset_func = lambda x, k, m: k * x + m # calculate an approximate frequenzy of the signal Hz = [] for raw in [max_raw, min_raw]: if len(raw) > 1: peak_pos = [x_axis[index] for index in zip(*raw)[0]] Hz.append(np.mean(np.diff(peak_pos))) Hz = 1 / np.mean(Hz) # model function # if cosine is used then tau could equal the x position of the peak # if sine were to be used then tau would be the first zero crossing if lock_frequency: func = lambda x, A, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2) else: func = lambda x, A, Hz, tau: A * np.sin(2 * pi * Hz * (x - tau) + pi / 2) #func = lambda x, A, Hz, tau: A * np.cos(2 * pi * Hz * (x - tau)) #get peaks fitted_peaks = [] for raw_peaks in [max_raw, min_raw]: peak_data = [] for peak in raw_peaks: index = peak[0] x_data = x_axis[index - points // 2: index + points // 2 + 1] y_data = y_axis[index - points // 2: index + points // 2 + 1] # get a first approximation of tau (peak position in time) tau = x_axis[index] # get a first approximation of peak amplitude A = peak[1] # build list of approximations if lock_frequency: p0 = (A, tau) else: p0 = (A, Hz, tau) # subtract offset from waveshape y_data -= offset popt, pcov = curve_fit(func, x_data, y_data, p0) # retrieve tau and A i.e x and y value of peak x = popt[-1] y = popt[0] # create a high resolution data set for the fitted waveform x2 = np.linspace(x_data[0], x_data[-1], points * 10) y2 = func(x2, *popt) # add the offset to the results y += offset y2 += offset y_data += offset peak_data.append([x, y, [x2, y2]]) fitted_peaks.append(peak_data) # structure date for output max_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[0]) max_fitted = map(lambda x: x[-1], fitted_peaks[0]) min_peaks = map(lambda x: [x[0], x[1]], fitted_peaks[1]) min_fitted = map(lambda x: x[-1], fitted_peaks[1]) #pylab.plot(x_axis, y_axis) #pylab.hold(True) #for max_p, max_f in zip(max_peaks, max_fitted): # pylab.plot(max_p[0], max_p[1], 'x') # pylab.plot(max_f[0], max_f[1], 'o', markersize = 2) #for min_p, min_f in zip(min_peaks, min_fitted): # pylab.plot(min_p[0], min_p[1], 'x') # pylab.plot(min_f[0], min_f[1], 'o', markersize = 2) #pylab.show() return [max_peaks, min_peaks] def peakdetect_sine_locked(y_axis, x_axis, points = 9): """ Convinience function for calling the 'peakdetect_sine' function with the lock_frequency argument as True. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. points -- (optional) How many points around the peak should be used during curve fitting, must be odd (default: 9) return -- see 'peakdetect_sine' """ return peakdetect_sine(y_axis, x_axis, points, True) def peakdetect_zero_crossing(y_axis, x_axis = None, window = 11): """ Function for detecting local maximas and minmias in a signal. Discovers peaks by dividing the signal into bins and retrieving the maximum and minimum value of each the even and odd bins respectively. Division into bins is performed by smoothing the curve and finding the zero crossings. Suitable for repeatable signals, where some noise is tolerated. Excecutes faster than 'peakdetect', although this function will break if the offset of the signal is too large. It should also be noted that the first and last peak will probably not be found, as this function only can find peaks between the first and last zero crossing. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- (optional) A x-axis whose values correspond to the y_axis list and is used in the return to specify the postion of the peaks. If omitted an index of the y_axis is used. (default: None) window -- the dimension of the smoothing window; should be an odd integer (default: 11) return -- two lists [max_peaks, min_peaks] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do: np.mean(max_peaks, 0)[1] on the results to unpack one of the lists into x, y coordinates do: x, y = zip(*tab) """ # check input data x_axis, y_axis = _datacheck_peakdetect(x_axis, y_axis) zero_indices = zero_crossings(y_axis, window = window) period_lengths = np.diff(zero_indices) bins_y = [y_axis[index:index + diff] for index, diff in zip(zero_indices, period_lengths)] bins_x = [x_axis[index:index + diff] for index, diff in zip(zero_indices, period_lengths)] even_bins_y = bins_y[::2] odd_bins_y = bins_y[1::2] even_bins_x = bins_x[::2] odd_bins_x = bins_x[1::2] hi_peaks_x = [] lo_peaks_x = [] #check if even bin contains maxima if abs(even_bins_y[0].max()) > abs(even_bins_y[0].min()): hi_peaks = [bin.max() for bin in even_bins_y] lo_peaks = [bin.min() for bin in odd_bins_y] # get x values for peak for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, hi_peaks): hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, lo_peaks): lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) else: hi_peaks = [bin.max() for bin in odd_bins_y] lo_peaks = [bin.min() for bin in even_bins_y] # get x values for peak for bin_x, bin_y, peak in zip(odd_bins_x, odd_bins_y, hi_peaks): hi_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) for bin_x, bin_y, peak in zip(even_bins_x, even_bins_y, lo_peaks): lo_peaks_x.append(bin_x[np.where(bin_y==peak)[0][0]]) max_peaks = [[x, y] for x,y in zip(hi_peaks_x, hi_peaks)] min_peaks = [[x, y] for x,y in zip(lo_peaks_x, lo_peaks)] return max_peaks, min_peaks def _smooth(x, window_len=11, window='hanning'): """ smooth the data using a window of the requested size. This method is based on the convolution of a scaled window on the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. input: x: the input signal window_len: the dimension of the smoothing window; should be an odd integer window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. output: the smoothed signal example: t = linspace(-2,2,0.1) x = sin(t)+randn(len(t))*0.1 y = _smooth(x) see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve, scipy.signal.lfilter TODO: the window parameter could be the window itself if a list instead of a string """ if x.ndim != 1: raise ValueError, "smooth only accepts 1 dimension arrays." if x.size < window_len: raise ValueError, "Input vector needs to be bigger than window size." if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise(ValueError, "Window is not one of '{0}', '{1}', '{2}', '{3}', '{4}'".format( *('flat', 'hanning', 'hamming', 'bartlett', 'blackman'))) s = np.r_[x[window_len-1:0:-1], x, x[-1:-window_len:-1]] #print(len(s)) if window == 'flat': #moving average w = np.ones(window_len,'d') else: w = eval('np.' + window + '(window_len)') y = np.convolve(w / w.sum(), s, mode = 'valid') return y def zero_crossings(y_axis, window = 11): """ Algorithm to find zero crossings. Smoothens the curve and finds the zero-crossings by looking for a sign change. keyword arguments: y_axis -- A list containg the signal over which to find zero-crossings window -- the dimension of the smoothing window; should be an odd integer (default: 11) return -- the index for each zero-crossing """ # smooth the curve length = len(y_axis) x_axis = np.asarray(range(length), int) # discard tail of smoothed signal y_axis = _smooth(y_axis, window)[:length] zero_crossings = np.where(np.diff(np.sign(y_axis)))[0] indices = [x_axis[index] for index in zero_crossings] # check if zero-crossings are valid diff = np.diff(indices) if diff.std() / diff.mean() > 0.2: print diff.std() / diff.mean() print np.diff(indices) raise(ValueError, "False zero-crossings found, indicates problem {0} or {1}".format( "with smoothing window", "problem with offset")) # check if any zero crossings were found if len(zero_crossings) < 1: raise(ValueError, "No zero crossings found") return indices # used this to test the fft function's sensitivity to spectral leakage #return indices + np.asarray(30 * np.random.randn(len(indices)), int) ############################Frequency calculation############################# # diff = np.diff(indices) # time_p_period = diff.mean() # # if diff.std() / time_p_period > 0.1: # raise ValueError, # "smoothing window too small, false zero-crossing found" # # #return frequency # return 1.0 / time_p_period ############################################################################## def _test_zero(): _max, _min = peakdetect_zero_crossing(y,x) def _test(): _max, _min = peakdetect(y,x, delta=0.30) def _test_graph(): i = 10000 x = np.linspace(0,3.7*pi,i) y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * np.random.randn(i)) y *= -1 x = range(i) @@ -276,4 +691,39 @@ def zero_crossings(y_axis, x_axis = None, window = 49): plot = pylab.plot(x,y) pylab.hold(True) pylab.plot(xm, ym, 'r+') pylab.plot(xn, yn, 'g+') _max, _min = peak_det_bad.peakdetect(y, 0.7, x) xm = [p[0] for p in _max] ym = [p[1] for p in _max] xn = [p[0] for p in _min] yn = [p[1] for p in _min] pylab.plot(xm, ym, 'y*') pylab.plot(xn, yn, 'k*') pylab.show() if __name__ == "__main__": from math import pi import pylab i = 10000 x = np.linspace(0,3.7*pi,i) y = (0.3*np.sin(x) + np.sin(1.3 * x) + 0.9 * np.sin(4.2 * x) + 0.06 * np.random.randn(i)) y *= -1 _max, _min = peakdetect(y, x, 750, 0.30) xm = [p[0] for p in _max] ym = [p[1] for p in _max] xn = [p[0] for p in _min] yn = [p[1] for p in _min] plot = pylab.plot(x, y) pylab.hold(True) pylab.plot(xm, ym, 'r+') pylab.plot(xn, yn, 'g+') pylab.show() -
sixtenbe revised this gist
Aug 29, 2011 . 1 changed file with 18 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,4 @@ import numpy as np def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): """ @@ -38,8 +37,13 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): if x_axis is None: x_axis = range(length) #perform some checks if length != len(x_axis): raise ValueError, "Input vectors y_axis and x_axis must have same length" if lookahead < 1: raise ValueError, "Lookahead must be above '1' in value" if not (np.isscalar(delta) and delta >= 0): raise ValueError, "delta must be a positive number" #needs to be a numpy array y_axis = np.asarray(y_axis) @@ -67,13 +71,6 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): #set algorithm to only find minima now mx = np.Inf mn = np.Inf ####look for min#### if y > mn+delta and mn != -np.Inf: @@ -85,22 +82,20 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): #set algorithm to only find maxima now mn = -np.Inf mx = -np.Inf #Remove the false hit on the first value of the y_axis try: if dump[0]: maxtab.pop(0) #print "pop max" else: mintab.pop(0) #print "pop min" del dump except IndexError: #no peaks were found, should the function return empty lists? pass return maxtab, mintab -
sixtenbe revised this gist
Aug 29, 2011 . 1 changed file with 5 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,9 +18,9 @@ def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): determine if it is the actual peak (default: 500) '(sample / period) / f' where '4 >= f >= 1.25' might be a good value delta -- (optional) this specifies a minimum difference between a peak and the following points, before a peak may be considered a peak. Useful to hinder the algorithm from picking up false peaks towards to end of the signal. To work well delta should be set to 'delta >= RMSnoise * 5'. (default: 0) Delta function causes a 20% decrease in speed, when omitted Correctly used it can double the speed of the algorithm @@ -116,7 +116,9 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49): Suitable for repeatable sinusoidal signals with some amount of RMS noise tolerable. Excecutes faster than 'peakdetect', although this function will break if the offset of the signal is too large. It should also be noted that the first and last peak will probably not be found, as this algorithm only can find peaks between the first and last zero crossing. keyword arguments: y_axis -- A list containg the signal over which to find peaks -
sixtenbe revised this gist
Aug 29, 2011 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -114,6 +114,9 @@ def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49): Division into bins is performed by smoothing the curve and finding the zero crossings. Suitable for repeatable sinusoidal signals with some amount of RMS noise tolerable. Excecutes faster than 'peakdetect', although this function will break if the offset of the signal is too large. keyword arguments: y_axis -- A list containg the signal over which to find peaks -
sixtenbe revised this gist
Aug 29, 2011 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,7 @@ import sys from numpy import NaN, Inf, arange, isscalar, asarray def peakdetect(y_axis, x_axis = None, lookahead = 500, delta = 0): """ Converted from/based on a MATLAB script at http://billauer.co.il/peakdet.html -
sixtenbe revised this gist
Aug 29, 2011 . 1 changed file with 258 additions and 56 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,75 +3,277 @@ def peakdet(v, delta, x = None): """ Converted from/based on a MATLAB script at http://billauer.co.il/peakdet.html Algorithm for detecting local maximas and minmias in a signal. Discovers peaks by searching for values which are surrounded by lower or larger values for maximas and minimas respectively keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used in the return to specify the postion of the peaks. If omitted the index of the y_axis is used. (default: None) lookahead -- (optional) distance to look ahead from a peak candidate to determine if it is the actual peak (default: 500) '(sample / period) / f' where '4 >= f >= 1.25' might be a good value delta -- (optional) this specifies a minimum difference between a peak and the points following it, before a peak may be considered a peak. Useful to hinder the algorithm from picking up false peaks towards to end of the signal. To work well delta should be set to 'delta >= StdDev * 5'. (default: 0) Delta function causes a 20% decrease in speed, when omitted Correctly used it can double the speed of the algorithm return -- two lists [maxtab, mintab] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results """ maxtab = [] mintab = [] dump = [] #Used to pop the first hit which always if false length = len(y_axis) if x_axis is None: x_axis = range(length) if length != len(x_axis): raise ValueError, 'Input vectors y_axis and x_axis must have same length' #needs to be a numpy array y_axis = np.asarray(y_axis) #maxima and minima candidates are temporarily stored in #mx and mn respectively mn, mx = np.Inf, -np.Inf #Only detect peak if there is 'lookahead' amount of points after it for index, (x, y) in enumerate(zip(x_axis[:-lookahead], y_axis[:-lookahead])): if y > mx: mx = y mxpos = x if y < mn: mn = y mnpos = x ####look for max#### if y < mx-delta and mx != np.Inf: #Maxima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].max() < mx: maxtab.append((mxpos, mx)) dump.append(True) #set algorithm to only find minima now mx = np.Inf mn = np.Inf if index+lookahead >= length: #end is within lookahead no more peaks can be found break continue #else: #slows shit down this does # mx = ahead # mxpos = x_axis[np.where(y_axis[index:index+lookahead]==mx)] ####look for min#### if y > mn+delta and mn != -np.Inf: #Minima peak candidate found #look ahead in signal to ensure that this is a peak and not jitter if y_axis[index:index+lookahead].min() > mn: mintab.append((mnpos, mn)) dump.append(False) #set algorithm to only find maxima now mn = -np.Inf mx = -np.Inf if index+lookahead >= length: #end is within lookahead no more peaks can be found break #else: #slows shit down this does # mn = ahead # mnpos = x_axis[np.where(y_axis[index:index+lookahead]==mn)] #Remove the false hit on the first value of the y_axis if dump[0]: maxtab.pop(0) #print "pop max" else: mintab.pop(0) #print "pop min" del dump return maxtab, mintab def peakdetect_zero_crossing(y_axis, x_axis = None, window = 49): """ Algorithm for detecting local maximas and minmias in a signal. Discovers peaks by dividing the signal into bins and retrieving the maximum and minimum value of each the even and odd bins respectively. Division into bins is performed by smoothing the curve and finding the zero crossings. keyword arguments: y_axis -- A list containg the signal over which to find peaks x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used in the return to specify the postion of the peaks. If omitted the index of the y_axis is used. (default: None) window -- the dimension of the smoothing window; should be an odd integer (default: 49) return -- two lists [maxtab, mintab] containing the positive and negative peaks respectively. Each cell of the lists contains a tupple of: (position, peak_value) to get the average peak value do 'np.mean(maxtab, 0)[1]' on the results """ if x_axis is None: x_axis = range(len(y_axis)) length = len(y_axis) if length != len(x_axis): raise ValueError, 'Input vectors y_axis and x_axis must have same length' #needs to be a numpy array y_axis = np.asarray(y_axis) zero_indices = zero_crossings(y_axis, window = window) period_lengths = np.diff(zero_indices) bins = [y_axis[indice:indice+diff] for indice, diff in zip(zero_indices, period_lengths)] even_bins = bins[::2] odd_bins = bins[1::2] #check if even bin contains maxima if even_bins[0].max() > abs(even_bins[0].min()): hi_peaks = [bin.max() for bin in even_bins] lo_peaks = [bin.min() for bin in odd_bins] else: hi_peaks = [bin.max() for bin in odd_bins] lo_peaks = [bin.min() for bin in even_bins] hi_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in hi_peaks] lo_peaks_x = [x_axis[np.where(y_axis==peak)[0]] for peak in lo_peaks] maxtab = [(x,y) for x,y in zip(hi_peaks, hi_peaks_x)] mintab = [(x,y) for x,y in zip(lo_peaks, lo_peaks_x)] return maxtab, mintab def smooth(x,window_len=11,window='hanning'): """ smooth the data using a window with requested size. This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. input: x: the input signal window_len: the dimension of the smoothing window; should be an odd integer window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. output: the smoothed signal example: t=linspace(-2,2,0.1) x=sin(t)+randn(len(t))*0.1 y=smooth(x) see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve scipy.signal.lfilter TODO: the window parameter could be the window itself if an array instead of a string """ if x.ndim != 1: raise ValueError, "smooth only accepts 1 dimension arrays." if x.size < window_len: raise ValueError, "Input vector needs to be bigger than window size." if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" s=np.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]] #print(len(s)) if window == 'flat': #moving average w=np.ones(window_len,'d') else: w=eval('np.'+window+'(window_len)') y=np.convolve(w/w.sum(),s,mode='valid') return y def zero_crossings(y_axis, x_axis = None, window = 49): """ Algorithm to find zero crossings. Smoothens the curve and finds the zero-crossings by looking for a sign change. keyword arguments: y_axis -- A list containg the signal over which to find zero-crossings x_axis -- A x-axis whose values correspond to the 'y_axis' list and is used in the return to specify the postion of the zero-crossings. If omitted then the indice of the y_axis is used. (default: None) window -- the dimension of the smoothing window; should be an odd integer (default: 49) return -- the x_axis value or the indice for each zero-crossing """ #smooth the curve length = len(y_axis) if x_axis == None: x_axis = range(length) x_axis = np.asarray(x_axis) y_axis = smooth(y_axis, window)[:length] zero_crossings = np.where(np.diff(np.sign(y_axis)))[0] times = [x_axis[indice] for indice in zero_crossings] #check if zero-crossings are valid diff = np.diff(times) if diff.std() / diff.mean() > 0.1: raise ValueError, "smoothing window too small, false zero-crossings found" return times if __name__=="__main__": import pylab from math import pi i = 10000 x = np.linspace(0,3.7*pi,i) y = 0.3*np.sin(x) + np.sin(1.3*x) + 0.9*np.sin(4.2*x) + 0.06*np.random.randn(i) y *= -1 x = range(i) _max, _min = peakdetect(y,x,750, 0.30) xm = [p[0] for p in _max] ym = [p[1] for p in _max] xn = [p[0] for p in _min] yn = [p[1] for p in _min] plot = pylab.plot(x,y) pylab.hold(True) pylab.plot(xm, ym, 'r+') pylab.plot(xn, yn, 'g+') -
endolith revised this gist
Nov 30, 2010 . 1 changed file with 4 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -71,3 +71,7 @@ def peakdet(v, delta, x = None): lookformax = True return maxtab, mintab if __name__=="__main__": series = [0,0,0,2,0,0,0,-2,0,0,0,2,0,0,0,-2,0] print peakdet(series,1) -
endolith revised this gist
Nov 30, 2010 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ import sys from numpy import NaN, Inf, arange, isscalar, asarray def peakdet(v, delta, x = None): """ -
endolith created this gist
Dec 7, 2009 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,73 @@ import sys from numpy import NaN, Inf, arange, isscalar def peakdet(v, delta, x = None): """ Converted from MATLAB script at http://billauer.co.il/peakdet.html Currently returns two lists of tuples, but maybe arrays would be better function [maxtab, mintab]=peakdet(v, delta, x) %PEAKDET Detect peaks in a vector % [MAXTAB, MINTAB] = PEAKDET(V, DELTA) finds the local % maxima and minima ("peaks") in the vector V. % MAXTAB and MINTAB consists of two columns. Column 1 % contains indices in V, and column 2 the found values. % % With [MAXTAB, MINTAB] = PEAKDET(V, DELTA, X) the indices % in MAXTAB and MINTAB are replaced with the corresponding % X-values. % % A point is considered a maximum peak if it has the maximal % value, and was preceded (to the left) by a value lower by % DELTA. % Eli Billauer, 3.4.05 (Explicitly not copyrighted). % This function is released to the public domain; Any use is allowed. """ maxtab = [] mintab = [] if x is None: x = arange(len(v)) v = asarray(v) if len(v) != len(x): sys.exit('Input vectors v and x must have same length') if not isscalar(delta): sys.exit('Input argument delta must be a scalar') if delta <= 0: sys.exit('Input argument delta must be positive') mn, mx = Inf, -Inf mnpos, mxpos = NaN, NaN lookformax = True for i in arange(len(v)): this = v[i] if this > mx: mx = this mxpos = x[i] if this < mn: mn = this mnpos = x[i] if lookformax: if this < mx-delta: maxtab.append((mxpos, mx)) mn = this mnpos = x[i] lookformax = False else: if this > mn+delta: mintab.append((mnpos, mn)) mx = this mxpos = x[i] lookformax = True return maxtab, mintab