-
-
Save ximeg/587011a65d05f067a29ce9c22894d1d2 to your computer and use it in GitHub Desktop.
| #!/usr/bin/env python | |
| # Implementation of algorithm from http://stackoverflow.com/a/22640362/6029703 | |
| import numpy as np | |
| import pylab | |
| def thresholding_algo(y, lag, threshold, influence): | |
| signals = np.zeros(len(y)) | |
| filteredY = np.array(y) | |
| avgFilter = [0]*len(y) | |
| stdFilter = [0]*len(y) | |
| avgFilter[lag - 1] = np.mean(y[0:lag]) | |
| stdFilter[lag - 1] = np.std(y[0:lag]) | |
| for i in range(lag, len(y) - 1): | |
| if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]: | |
| if y[i] > avgFilter[i-1]: | |
| signals[i] = 1 | |
| else: | |
| signals[i] = -1 | |
| filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1] | |
| avgFilter[i] = np.mean(filteredY[(i-lag):i]) | |
| stdFilter[i] = np.std(filteredY[(i-lag):i]) | |
| else: | |
| signals[i] = 0 | |
| filteredY[i] = y[i] | |
| avgFilter[i] = np.mean(filteredY[(i-lag):i]) | |
| stdFilter[i] = np.std(filteredY[(i-lag):i]) | |
| return dict(signals = np.asarray(signals), | |
| avgFilter = np.asarray(avgFilter), | |
| stdFilter = np.asarray(stdFilter)) | |
| # Data | |
| y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9, | |
| 1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3, | |
| 2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]) | |
| # Settings: lag = 30, threshold = 5, influence = 0 | |
| lag = 30 | |
| threshold = 5 | |
| influence = 0 | |
| # Run algo with settings from above | |
| result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence) | |
| # Plot result | |
| pylab.subplot(211) | |
| pylab.plot(np.arange(1, len(y)+1), y) | |
| pylab.plot(np.arange(1, len(y)+1), | |
| result["avgFilter"], color="cyan", lw=2) | |
| pylab.plot(np.arange(1, len(y)+1), | |
| result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2) | |
| pylab.plot(np.arange(1, len(y)+1), | |
| result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2) | |
| pylab.subplot(212) | |
| pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2) | |
| pylab.ylim(-1.5, 1.5) | |
Any idea how to modify this so it works with a real time data stream? I mean without recalculating the entire series each time, but instead to only recalculate the last item passed to the function.
Thanks for posting this. I noticed a bug on line 13.
It should be:
for i in range(lag, len(y)):
(otherwise the last item in the y list never gets checked).
i think
np.std(y[0:lag], ddof=1)
you use the wrong degrees of freedom
Thank you so much for your code!
With small changes, your code could be faster. I changed the list to arrays and used numba for optimization.
With large time series, your code takes approximately 35 s, with changes the code takes 1.5 s.
Check here:
https://gist.github.com/dlegor/286dd5cd5ecc317c34ed81aaed4a2b37
Thank you for this code,
I did some modifications for the estimation of avg and std.
https://gist.github.com/PeiLi-Sandman/d67baad835a516481e6ca124a08840f2

Yes!!! very nice. Thankyou