I have a signal of electromyographical data that I am supposed (scientific papers' explicit recommendation) to smooth using RMS.
I have the following working code, producing the desired output, but it is way slower than I think it's possible.
#!/usr/bin/python
import numpy
def rms(interval, halfwindow):
""" performs the moving-window smoothing of a signal using RMS """
n = len(interval)
rms_signal = numpy.zeros(n)
for i in range(n):
small_index = max(0, i - halfwindow) # intended to avoid boundary effect
big_index = min(n, i + halfwindow) # intended to avoid boundary effect
window_samples = interval[small_index:big_index]
# here is the RMS of the window, being attributed to rms_signal 'i'th sample:
rms_signal[i] = sqrt(sum([s**2 for s in window_samples])/len(window_samples))
return rms_signal
I have seen some deque
and itertools
suggestions regarding optimization of moving window loops, and also convolve
from numpy, but I couldn't figure it out how to accomplish what I want using them.
Also, I do not care to avoid boundary problems anymore, because I end up having large arrays and relatively small sliding windows.
Thanks for reading
Definition. The RMS value of a set of values (or a continuous-time waveform) is the square root of the arithmetic mean of the squares of the values, or the square of the function that defines the continuous waveform.
The RMS value based on the square root of the sum of squares This theorem says that the integral of the square of a function is equal with the integral of the squared components of its spectrum. This means that the total energy of a waveform can be found in the total energy of the waveform's components.
The square root of the mean of the square. RMS is (to engineers anyway) a meaningful way of calculating the average of values over a period of time. With audio, the signal value (amplitude) is squared, averaged over a period of time, then the square root of the result is calculated.
It is possible to use convolution to perform the operation you are referring to. I did it a few times for processing EEG signals as well.
import numpy as np
def window_rms(a, window_size):
a2 = np.power(a,2)
window = np.ones(window_size)/float(window_size)
return np.sqrt(np.convolve(a2, window, 'valid'))
Breaking it down, the np.power(a, 2)
part makes a new array with the same dimension as a
, but where each value is squared. np.ones(window_size)/float(window_size)
produces an array or length window_size
where each element is 1/window_size
. So the convolution effectively produces a new array where each element i
is equal to
(a[i]^2 + a[i+1]^2 + … + a[i+window_size]^2)/window_size
which is the RMS value of the array elements within the moving window. It should perform really well this way.
Note, though, that np.power(a, 2)
produces a new array of same dimension. If a
is really large, I mean sufficiently large that it cannot fit twice in memory, you might need a strategy where each element are modified in place. Also, the 'valid'
argument specifies to discard border effects, resulting in a smaller array produced by np.convolve()
. You could keep it all by specifying 'same'
instead (see documentation).
I found my machine struggling with convolve, so I propose the following solution:
Compute Moving RMS Window Quickly
Suppose we have analog voltage samples a0 ... a99 (one hundred samples) and we need to take moving RMS of 10 samples through them.
The window will scan initially from elements a0 to a9 (ten samples) to get rms0.
# rms = [rms0, rms1, ... rms99-9] (total of 91 elements in list):
(rms0)^2 = (1/10) (a0^2 + ... + a9^2) # --- (note 1)
(rms1)^2 = (1/10) (... a1^2 + ... + a9^2 + a10^2) # window moved a step, a0 falls out, a10 comes in
(rms2)^2 = (1/10) ( a2^2 + ... + a10^2 + a11^2) # window moved another step, a1 falls out, a11 comes in
...
Simplifying it: We havea = [a0, ... a99]
To create moving RMS of 10 samples, we can take sqrt of the addition of 10 a^2
's and multiplied by 1/10.
In other words, if we have
p = (1/10) * a^2 = 1/10 * [a0^2, ... a99^2]
To get rms^2
simply add a group of 10 p's.
Let's have an acummulator acu:
acu = p0 + ... p8 # (as in note 1 above)
Then we can have
rms0^2 = p0 + ... p8 + p9
= acu + p9
rms1^2 = acu + p9 + p10 - p0
rms2^2 = acu + p9 + p10 + p11 - p0 - p1
...
we can create:
V0 = [acu, 0, 0, ... 0]
V1 = [ p9, p10, p11, .... p99] -- len=91
V2 = [ 0, -p0, -p1, ... -p89] -- len=91
V3 = V0 + V1 + V2
if we run
itertools.accumulate(V3)
we will get rms array
Code:
import numpy as np
from itertools import accumulate
a2 = np.power(in_ch, 2) / tm_w # create array of p, in_ch is samples, tm_w is window length
v1 = np.array(a2[tm_w - 1 : ]) # v1 = [p9, p10, ...]
v2 = np.append([0], a2[0 : len(a2) - tm_w]) # v2 = [0, p0, ...]
acu = list(accumulate(a2[0 : tm_w - 1])) # get initial accumulation (acu) of the window - 1
v1[0] = v1[0] + acu[-1] # rms element #1 will be at end of window and contains the accumulation
rmspw2 = list(accumulate(v1 - v2))
rms = np.power(rmspw2, 0.5)
I can compute an array of 128 Mega samples in less than 1 minute.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With