Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RX: Stateful transform of sequence, e.g. exponential moving average

How can you do in RX a simple, stateful transform of a sequence?

Say we want to make an exponential moving average transform of a IObservable noisySequence.

Whenever noisySequence ticks, emaSequence should tick and return the value (previousEmaSequenceValue*(1-lambda) + latestNoisySequenceValue*lambda)

I guess we use Subjects, but how exactly?

    public static void Main()
    {

        var rand = new Random();

        IObservable<double> sequence  = Observable
            .Interval(TimeSpan.FromMilliseconds(1000))
            .Select(value => value + rand.NextDouble());

        Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5);

        IObservable<double> noisySequence = sequence.Select(addNoise);

        Subject<double> exponentialMovingAverage = new Subject<double>(); // ??? 


        sequence.Subscribe(value => Console.WriteLine("original sequence "+value));
        noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value));
        exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value));

        Console.ReadLine();
    }
like image 622
Sputnik2513 Avatar asked Feb 11 '13 03:02

Sputnik2513


3 Answers

This is how you can attach state to a sequence. In this case it calculates the average of the last 10 values.

var movingAvg = noisySequence.Scan(new List<double>(),
(buffer, value)=>
{
    buffer.Add(value);
    if(buffer.Count>MaxSize)
    {
        buffer.RemoveAt(0);
    }
    return buffer;
}).Select(buffer=>buffer.Average());

But you could use Window (which Buffer is sort of a generalisation of) to get your average too.

noisySequence.Window(10)
   .Select(window=>window.Average())
   .SelectMany(averageSequence=>averageSequence);
like image 67
Lee Campbell Avatar answered Nov 15 '22 12:11

Lee Campbell


For many of these types of calculations, Buffer is the easiest way

var movingAverage = noisySequence.Buffer(/*last*/ 3,
    /*move forward*/ 1 /*at a time*/)
    .Select(x => (x[0] + x[1] + x[2]) / 3.0);

If you need to carry state around, use the Scan operator, which is like Aggregate except that it yields values every iteration.

Edit: fixed comment syntax

like image 32
Ana Betts Avatar answered Nov 15 '22 10:11

Ana Betts


Thanks! Here is a solution using Scan

    const double lambda = 0.99;
    IObservable<double> emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) =>
        {
            if (Double.IsNaN(emaValue))
            {
                emaValue = value;
            }
            else
            {
                emaValue = emaValue*lambda + value*(1-lambda);
            }
            return emaValue;
        }).Select(emaValue => emaValue);
like image 27
Sputnik2513 Avatar answered Nov 15 '22 11:11

Sputnik2513