Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filtering based on the last two current values in Java rx

I'm trying to build a simple application using java reactive extensions. I have two streams that emits temperature values continuously, I want to detect and filter out spikes of sensed temperature that could be errors, for doing so I need to take account of the precedent value too so that I can take account of the variation like so:

filtering on pairs idea

Still I was unable to find the right operator in the documentation. Has anybody any idea of how can I accomplish the task? Should I make a custom operator?

These are my streams:

double min = 50, max = 75, spikeFreq = 0.01;
    Observable<Double> tempStream1 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

    Observable<Double> tempStream2 = Observable.create((
            Subscriber<? super Double> subscriber) -> {
        new TempStream(subscriber, min, max, spikeFreq).start();
    });

public class TempStream extends Thread{

private Subscriber<? super Double> subscriber;
private TempSensor sensor;

public TempStream(Subscriber<? super Double> subscriber, double min,
        double max, double spikeFreq) {
    this.subscriber = subscriber;
    sensor = new TempSensor(min, max, spikeFreq);
}

    @Override
    public void run() {
        Random gen = new Random(System.currentTimeMillis());
        while (!subscriber.isUnsubscribed()) {
            try {
                subscriber.onNext(sensor.getCurrentValue());
                Thread.sleep(1000 + gen.nextInt() % 1000);
            } catch (Exception ex) {
                subscriber.onError(ex);
            }
        }
        subscriber.onCompleted();
    }
}
like image 585
Pievis Avatar asked Jun 03 '15 08:06

Pievis


1 Answers

Perhaps the buffer operator (http://reactivex.io/documentation/operators/buffer.html) might help in this case. You want to use buffer with count = 2 and skip = 1. That way you'll get a "lookahead" of one element on the stream.

E.g.:

stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);

Notice that this example also checks whether two values were buffered since it might happen that only one is emitted upon completion of the stream.

like image 52
Georgi Khomeriki Avatar answered Nov 15 '22 12:11

Georgi Khomeriki