I'm trying to build a simple application using java reactive extensions. I have two streams that emits temperature values continuously, I want to detect and filter out spikes of sensed temperature that could be errors, for doing so I need to take account of the precedent value too so that I can take account of the variation like so:
Still I was unable to find the right operator in the documentation. Has anybody any idea of how can I accomplish the task? Should I make a custom operator?
These are my streams:
double min = 50, max = 75, spikeFreq = 0.01;
Observable<Double> tempStream1 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
Observable<Double> tempStream2 = Observable.create((
Subscriber<? super Double> subscriber) -> {
new TempStream(subscriber, min, max, spikeFreq).start();
});
public class TempStream extends Thread{
private Subscriber<? super Double> subscriber;
private TempSensor sensor;
public TempStream(Subscriber<? super Double> subscriber, double min,
double max, double spikeFreq) {
this.subscriber = subscriber;
sensor = new TempSensor(min, max, spikeFreq);
}
@Override
public void run() {
Random gen = new Random(System.currentTimeMillis());
while (!subscriber.isUnsubscribed()) {
try {
subscriber.onNext(sensor.getCurrentValue());
Thread.sleep(1000 + gen.nextInt() % 1000);
} catch (Exception ex) {
subscriber.onError(ex);
}
}
subscriber.onCompleted();
}
}
Perhaps the buffer
operator (http://reactivex.io/documentation/operators/buffer.html) might help in this case.
You want to use buffer
with count = 2
and skip = 1
. That way you'll get a "lookahead" of one element on the stream.
E.g.:
stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);
Notice that this example also checks whether two values were buffered since it might happen that only one is emitted upon completion of the stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With