Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

reactive extensions sliding time window

I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.

like image 336
NeddySpaghetti Avatar asked Jul 19 '12 10:07

NeddySpaghetti


Video Answer


2 Answers

Would this extension method solve your problem?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}
like image 177
Enigmativity Avatar answered Oct 05 '22 00:10

Enigmativity


You are looking for the Window operators! Here is a lengthy article I wrote on working with sequences of coincidence (overlapping windows of sequences) http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

So if you wanted to build a rolling average you could use this sort of code

var scheduler = new TestScheduler();
var notifications = new Recorded<Notification<double>>[30];
for (int i = 0; i < notifications.Length; i++)
{
  notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i));
}
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30
var source = scheduler.CreateHotObservable(notifications);

source.GroupJoin(
      source,   //Take values from myself
      _=>Observable.Return(0, scheduler), //Just the first value
      _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour
      (lhs, rhs)=>rhs.Sum())    //Aggregation you want to do.
    .Subscribe(i=>Console.WriteLine (i));
scheduler.Start();

And we can see it output the rolling sums as it receives values.

0, 1, 3, 6, 10, 15, 21, 28...

like image 33
Lee Campbell Avatar answered Oct 04 '22 23:10

Lee Campbell