Disclaimer: I am probably not as good at DSP as I should be and therefore have more issues than I should have getting this code to work.
I need to filter incoming signals as they happen. I tried to make this code to work, but I have not been able to so far. Referencing scipy.signal.lfilter doc
import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
from lib import fnlib
samples = 100
x = np.linspace(0, 7, samples)
y = [] # Unfiltered output
y_filt1 = [] # Real-time filtered
nyq = 0.5 * samples
f1_norm = 0.1 / nyq
f2_norm = 2 / nyq
b, a = scipy.signal.butter(2, [f1_norm, f2_norm], 'band', analog=False)
zi = scipy.signal.lfilter_zi(b,a)
zi = zi*(np.sin(0) + 0.1*np.sin(15*0))
This sets zi
as zi*y[0 ]
initially, which in this case is 0. I have got it from the example code in the lfilter
documentation, but I am not sure if this is correct at all.
Then it comes to the point where I am not sure what to do with the few initial samples.
The coefficients a
and b
are len(a) = 5
here.
As lfilter
takes input values from now to n-4, do I pad it with zeroes, or do I need to wait until 5 samples have gone by and take them as a single bloc, then continuously sample each next step in the same way?
for i in range(0, len(a)-1): # Append 0 as initial values, wrong?
y.append(0)
step = 0
for i in xrange(0, samples): #x:
tmp = np.sin(x[i]) + 0.1*np.sin(15*x[i])
y.append(tmp)
# What to do with the inital filterings until len(y) == len(a) ?
if (step> len(a)):
y_filt, zi = scipy.signal.lfilter(b, a, y[-len(a):], axis=-1, zi=zi)
y_filt1.append(y_filt[4])
print(len(y))
y = y[4:]
print(len(y))
y_filt2 = scipy.signal.lfilter(b, a, y) # Offline filtered
plt.plot(x, y, x, y_filt1, x, y_filt2)
plt.show()
It is a vector (or array of vectors for an N-dimensional input) of length max(len(a), len(b)) - 1 . If zi is None or is not given then initial rest is assumed. See lfiltic for more information. Returns yarray. The output of the digital filter.
I think I had the same problem, and found a solution on https://github.com/scipy/scipy/issues/5116:
from scipy import zeros, signal, random
def filter_sbs():
data = random.random(2000)
b = signal.firwin(150, 0.004)
z = signal.lfilter_zi(b, 1) * data[0]
result = zeros(data.size)
for i, x in enumerate(data):
result[i], z = signal.lfilter(b, 1, [x], zi=z)
return result
if __name__ == '__main__':
result = filter_sbs()
The idea is to pass the filter state z
in each subsequent call to lfilter
. For the first few samples the filter may give strange results, but later (depending on the filter length) it starts to behave correctly.
The problem is not how you are buffering the input. The problem is that in the 'offline' version, the state of the filter is initialized using lfilter_zi
which computes the internal state of an LTI so that the output will already be in steady-state when new samples arrive at the input. In the 'real-time' version, you skip this so that the filter's initial state is 0. You can either initialize both versions to using lfilter_zi
or else initialize both to 0. Then, it doesn't matter how many samples you filter at a time.
Note, if you initialize to 0, the filter will 'ring' for a certain amount of time before reaching a steady state. In the case of FIR filters, there is an analytic solution for determining this time. For many IIR filters, there is not.
This following is correct. For simplicity's sake I initialize to 0 and feed the input on sample at a time. However, any non-zero block size will produce equivalent output.
from scipy import signal, random
from numpy import zeros
def filter_sbs(data, b):
z = zeros(b.size-1)
result = zeros(data.size)
for i, x in enumerate(data):
result[i], z = signal.lfilter(b, 1, [x], zi=z)
return result
def filter(data, b):
result = signal.lfilter(b,1,data)
return result
if __name__ == '__main__':
data = random.random(20000)
b = signal.firwin(150, 0.004)
result1 = filter_sbs(data, b)
result2 = filter(data, b)
print(result1 - result2)
Output:
[ 0.00000000e+00 0.00000000e+00 0.00000000e+00 ... -5.55111512e-17
0.00000000e+00 1.66533454e-16]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With