Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to real-time filter with scipy and lfilter?

Disclaimer: I am probably not as good at DSP as I should be and therefore have more issues than I should have getting this code to work.

I need to filter incoming signals as they happen. I tried to make this code to work, but I have not been able to so far. Referencing scipy.signal.lfilter doc

import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
from lib import fnlib

samples = 100
x = np.linspace(0, 7, samples)
y = [] # Unfiltered output
y_filt1 = [] # Real-time filtered

nyq = 0.5 * samples
f1_norm = 0.1 / nyq
f2_norm = 2 / nyq
b, a = scipy.signal.butter(2, [f1_norm, f2_norm], 'band', analog=False)
zi = scipy.signal.lfilter_zi(b,a)
zi = zi*(np.sin(0) + 0.1*np.sin(15*0))

This sets zi as zi*y[0 ] initially, which in this case is 0. I have got it from the example code in the lfilter documentation, but I am not sure if this is correct at all.

Then it comes to the point where I am not sure what to do with the few initial samples. The coefficients a and b are len(a) = 5 here. As lfilter takes input values from now to n-4, do I pad it with zeroes, or do I need to wait until 5 samples have gone by and take them as a single bloc, then continuously sample each next step in the same way?

for i in range(0, len(a)-1): # Append 0 as initial values, wrong?
    y.append(0)

step = 0
for i in xrange(0, samples): #x:
    tmp = np.sin(x[i]) + 0.1*np.sin(15*x[i])
    y.append(tmp)

    # What to do with the inital filterings until len(y) ==  len(a) ?

    if (step> len(a)):
        y_filt, zi = scipy.signal.lfilter(b, a, y[-len(a):], axis=-1, zi=zi)
        y_filt1.append(y_filt[4])

print(len(y))
y = y[4:]
print(len(y))
y_filt2 = scipy.signal.lfilter(b, a, y) # Offline filtered

plt.plot(x, y, x, y_filt1, x, y_filt2)
plt.show()
like image 441
trondhe Avatar asked Nov 08 '16 09:11

trondhe


People also ask

What is lfilter scipy?

It is a vector (or array of vectors for an N-dimensional input) of length max(len(a), len(b)) - 1 . If zi is None or is not given then initial rest is assumed. See lfiltic for more information. Returns yarray. The output of the digital filter.


2 Answers

I think I had the same problem, and found a solution on https://github.com/scipy/scipy/issues/5116:

from scipy import zeros, signal, random

def filter_sbs():
    data = random.random(2000)
    b = signal.firwin(150, 0.004)
    z = signal.lfilter_zi(b, 1) * data[0]
    result = zeros(data.size)
    for i, x in enumerate(data):
        result[i], z = signal.lfilter(b, 1, [x], zi=z)
    return result

if __name__ == '__main__':
    result = filter_sbs()

The idea is to pass the filter state z in each subsequent call to lfilter. For the first few samples the filter may give strange results, but later (depending on the filter length) it starts to behave correctly.

like image 182
dloeckx Avatar answered Nov 15 '22 17:11

dloeckx


The problem is not how you are buffering the input. The problem is that in the 'offline' version, the state of the filter is initialized using lfilter_zi which computes the internal state of an LTI so that the output will already be in steady-state when new samples arrive at the input. In the 'real-time' version, you skip this so that the filter's initial state is 0. You can either initialize both versions to using lfilter_zi or else initialize both to 0. Then, it doesn't matter how many samples you filter at a time.

Note, if you initialize to 0, the filter will 'ring' for a certain amount of time before reaching a steady state. In the case of FIR filters, there is an analytic solution for determining this time. For many IIR filters, there is not.

This following is correct. For simplicity's sake I initialize to 0 and feed the input on sample at a time. However, any non-zero block size will produce equivalent output.

from scipy import signal, random
from numpy import zeros

def filter_sbs(data, b):
    z = zeros(b.size-1)
    result = zeros(data.size)
    for i, x in enumerate(data):
        result[i], z = signal.lfilter(b, 1, [x], zi=z)
    return result
    
def filter(data, b):
    result = signal.lfilter(b,1,data)
    return result

if __name__ == '__main__':
    data = random.random(20000)
    b = signal.firwin(150, 0.004)
    result1 = filter_sbs(data, b)
    result2 = filter(data, b)
    print(result1 - result2)

Output:

[ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ... -5.55111512e-17
  0.00000000e+00  1.66533454e-16]
like image 39
dmedine Avatar answered Nov 15 '22 18:11

dmedine