I have a longish signal (454912 samples) and I'd like to compute an estimate of the amount of 50Hz in it. Speed is more important here than precision. The amount of 50Hz present is expected to fluctuate over time. The value needs to be representative for the entire recording, for instance the mean.
The signal is recorded from an EEG electrode. When EEG electrodes have bad contact with the scalp, there will be a large amount of 50Hz powerline noise in the signal. I would like to discard all data from EEG electrodes that have excessively more 50Hz noise than other electrodes.
Solving the problem is not that hard. Anything from FFT to Welch's method can be employed to estimate the power spectrum:
import numpy as np
from scipy.signal import welch
# generate an example signal
sfreq = 128.
nsamples = 454912
time = np.arange(nsamples) / sfreq
x = np.sin(2 * np.pi * 50 * time) + np.random.randn(nsamples)
# apply Welch' method to the problem
fs, ps = welch(x, sfreq)
print 'Amount of 50Hz:', ps[np.searchsorted(fs, 50)]
However, computing the power at all frequencies seems unnecessary here and I have the feeling that there is a more efficient solution. Something along the lines of computing a single DFFT step? Convolution with some sinoid wavelet?
Welch's method just computes the periodogram for multiple overlapping segments of the signal, then takes the average across segments. This effectively trades resolution for noise reduction in the frequency domain.
However, performing lots of separate FFTs for each small segment will be more expensive than computing fewer FFTs for larger segments. Depending on your needs, you may get away with using Welch's method but splitting your signal into larger segments, and/or having less overlap between them (both of which will give you less variance reduction in the PSD).
from matplotlib import pyplot as plt
# default parameters
fs1, ps1 = welch(x, sfreq, nperseg=256, noverlap=128)
# 8x the segment size, keeping the proportional overlap the same
fs2, ps2 = welch(x, sfreq, nperseg=2048, noverlap=1024)
# no overlap between the segments
fs3, ps3 = welch(x, sfreq, nperseg=2048, noverlap=0)
fig, ax1 = plt.subplots(1, 1)
ax1.hold(True)
ax1.loglog(fs1, ps1, label='Welch, defaults')
ax1.loglog(fs2, ps2, label='length=2048, overlap=1024')
ax1.loglog(fs3, ps3, label='length=2048, overlap=0')
ax1.legend(loc=2, fancybox=True)
Increasing the segment size and reducing the amount of overlap can substantially improve performance:
In [1]: %timeit welch(x, sfreq)
1 loops, best of 3: 262 ms per loop
In [2]: %timeit welch(x, sfreq, nperseg=2048, noverlap=1024)
10 loops, best of 3: 46.4 ms per loop
In [3]: %timeit welch(x, sfreq, nperseg=2048, noverlap=0)
10 loops, best of 3: 23.2 ms per loop
Note that it's a good idea to use a power of 2 for the window size, since it's faster to take the FFT of a signal whose length is a power of 2.
Another simple thing you might consider trying is just bandpass filtering your signal with a notch filter centered on 50Hz. The envelope of the filtered signal will give you a measure of how much 50Hz power your signal contains over time.
from scipy.signal import filter_design, filtfilt
# a signal whose power at 50Hz varies over time
sfreq = 128.
nsamples = 454912
time = np.arange(nsamples) / sfreq
sinusoid = np.sin(2 * np.pi * 50 * time)
pow50hz = np.zeros(nsamples)
pow50hz[nsamples / 4: 3 * nsamples / 4] = 1
x = pow50hz * sinusoid + np.random.randn(nsamples)
# Chebyshev notch filter centered on 50Hz
nyquist = sfreq / 2.
b, a = filter_design.iirfilter(3, (49. / nyquist, 51. / nyquist), rs=10,
ftype='cheby2')
# filter the signal
xfilt = filtfilt(b, a, x)
fig, ax2 = plt.subplots(1, 1)
ax2.hold(True)
ax2.plot(time[::10], x[::10], label='Raw signal')
ax2.plot(time[::10], xfilt[::10], label='50Hz bandpass-filtered')
ax2.set_xlim(time[0], time[-1])
ax2.set_xlabel('Time')
ax2.legend(fancybox=True)
Having seen @hotpaw2's answer I decided to try implementing the Goertzel algorithm, just for fun. Unfortunately it's a recursive algorithm (and therefore can't be vectorized over time), so I decided to write myself a Cython version:
# cython: boundscheck=False
# cython: wraparound=False
# cython: cdivision=True
from libc.math cimport cos, M_PI
cpdef double goertzel(double[:] x, double ft, double fs=1.):
"""
The Goertzel algorithm is an efficient method for evaluating single terms
in the Discrete Fourier Transform (DFT) of a signal. It is particularly
useful for measuring the power of individual tones.
Arguments
----------
x double array [nt,]; the signal to be decomposed
ft double scalar; the target frequency at which to evaluate the DFT
fs double scalar; the sample rate of x (same units as ft, default=1)
Returns
----------
p double scalar; the DFT coefficient corresponding to ft
See: <http://en.wikipedia.org/wiki/Goertzel_algorithm>
"""
cdef:
double s
double s_prev = 0
double s_prev2 = 0
double coeff = 2 * cos(2 * M_PI * (ft / fs))
Py_ssize_t N = x.shape[0]
Py_ssize_t ii
for ii in range(N):
s = x[ii] + (coeff * s_prev) - s_prev2
s_prev2 = s_prev
s_prev = s
return s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2
Here's what it does:
freqs = np.linspace(49, 51, 1000)
pows = np.array([goertzel(x, ff, sfreq) for ff in freqs])
fig, ax = plt.subplots(1, 1)
ax.plot(freqs, pows, label='DFT coefficients')
ax.set_xlabel('Frequency (Hz)')
ax.legend(loc=1)
It is pretty damn fast:
In [1]: %timeit goertzel(x, 50, sfreq)
1000 loops, best of 3: 1.98 ms per loop
Obviously this approach only makes sense if you're just interested in a single frequency rather than a range of frequencies.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With