I need to use a Hampel filter on my data, stripping outliers.
I haven't been able to find an existing one in Python; only in Matlab and R.
[Matlab function description][1]
[Stats Exchange discussion of Matlab Hampel function][2]
[R pracma package vignette; contains hampel function][3]
I've written the following function, modeling it off the function in the R pracma package; however, it is far far slower than the Matlab version. This is not ideal; would appreciate input on how to speed it up.
The function is shown below-
def hampel(x,k, t0=3):
'''adapted from hampel function in R package pracma
x= 1-d numpy array of numbers to be filtered
k= number of items in window/2 (# forward and backward wanted to capture in median filter)
t0= number of standard deviations to use; 3 is default
'''
n = len(x)
y = x #y is the corrected series
L = 1.4826
for i in range((k + 1),(n - k)):
if np.isnan(x[(i - k):(i + k+1)]).all():
continue
x0 = np.nanmedian(x[(i - k):(i + k+1)])
S0 = L * np.nanmedian(np.abs(x[(i - k):(i + k+1)] - x0))
if (np.abs(x[i] - x0) > t0 * S0):
y[i] = x0
return(y)
The R implementation in "pracma" package, which I am using as a model:
function (x, k, t0 = 3)
{
n <- length(x)
y <- x
ind <- c()
L <- 1.4826
for (i in (k + 1):(n - k)) {
x0 <- median(x[(i - k):(i + k)])
S0 <- L * median(abs(x[(i - k):(i + k)] - x0))
if (abs(x[i] - x0) > t0 * S0) {
y[i] <- x0
ind <- c(ind, i)
}
}
list(y = y, ind = ind)
}
Any help in making function more efficient, or a pointer to an existing implementation in an existing Python module would be much appreciated. Example data below; %%timeit cell magic in Jupyter indicates it currently takes 15 seconds to run:
vals=np.random.randn(250000)
vals[3000]=100
vals[200]=-9000
vals[-300]=8922273
%%timeit
hampel(vals, k=6)
[1]: https://www.mathworks.com/help/signal/ref/hampel.html [2]: https://dsp.stackexchange.com/questions/26552/what-is-a-hampel-filter-and-how-does-it-work [3]: https://cran.r-project.org/web/packages/pracma/pracma.pdf
It uses robust moving estimates to identify outliers in a time series. If the method identifies an outlier, you might decide to replace the extreme value with an imputed value, such as the rolling median at that time point. This kind of imputation is known as the Hampel filter.
B = rmoutliers( A , movmethod , window ) detects local outliers using a moving window mean or median with window length window . For example, rmoutliers(A,"movmean",5) defines outliers as elements more than three local standard deviations from the local mean within a five-element window.
Remove outliers in the raw data by applying Hampel function. Specify the window size as 6, or about three minutes of data on either side of each sample in the measurement window. This setting allows for sufficient data to decide whether each point is an outlier.
Use hampel to locate every sample that differs by more than three standard deviations from the local median. The measurement window is composed of the sample and its six surrounding samples, three per side. [y,i,xmedian,xsigma] = hampel(x); Plot the filtered signal and annotate the outliers.
Solution by @EHB above is helpful, but it is incorrect. Specifically, the rolling median calculated in median_abs_deviation is of difference, which itself is the difference between each data point and the rolling median calculated in rolling_median, but it should be the median of differences between the data in the rolling window and the median over the window. I took the code above and modified it:
def hampel(vals_orig, k=7, t0=3):
'''
vals: pandas series of values from which to remove outliers
k: size of window (including the sample; 7 is equal to 3 on either side of value)
'''
#Make copy so original not edited
vals = vals_orig.copy()
#Hampel Filter
L = 1.4826
rolling_median = vals.rolling(window=k, center=True).median()
MAD = lambda x: np.median(np.abs(x - np.median(x)))
rolling_MAD = vals.rolling(window=k, center=True).apply(MAD)
threshold = t0 * L * rolling_MAD
difference = np.abs(vals - rolling_median)
'''
Perhaps a condition should be added here in the case that the threshold value
is 0.0; maybe do not mark as outlier. MAD may be 0.0 without the original values
being equal. See differences between MAD vs SDV.
'''
outlier_idx = difference > threshold
vals[outlier_idx] = rolling_median[outlier_idx]
return(vals)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With