Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

use RX to trigger events at varying times?

I've got a large collection of simple pair class:

public class Pair { public DateTime Timestamp; public double Value; }

They are sorted by the ascending Timestamp. I want to trigger an event with the Value (say, Action<double>) for each item in the list at the appropriate time. The times are in the past, so I need to normalize the Timestamps such that the first one in the list is "now". Can we set this up with the Reactive Extensions such that it triggers the next event after on the difference in time between two items?

like image 557
Brannon Avatar asked May 03 '12 22:05

Brannon


2 Answers

Say pairs is your sequence:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();

Now obs is pairs as an ordered observable.

Observable.Zip(
    obs,
    obs.Take(1).Concat(obs),
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
      .Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);

The zip takes care of turning the absolute times into offsets. It will take the sequence and join it with itself, but offset by one, as follows

Original 1--2--4--7--11
Offset   1--1--2--4--7--11
Joined   0--1--2--3--4

This new value is then put into the Observable.Timer to delay it the appropriate amount. The final Concat flattens the result from an IObservable<IObservable<double>> into an IObservable<double>. This assumes that your sequence is ordered.

like image 197
Matthew Finlay Avatar answered Nov 14 '22 15:11

Matthew Finlay


If by "using Rx" you allow me to just use the Rx schedulers, then this is a very easy solution:

Action<double> action =
    x =>
        Console.WriteLine(x);

var ts0 = pairs.Select(p => p.Timestamp).Min();

pairs
    .ForEach(p => 
        Scheduler
            .ThreadPool
            .Schedule(
                p.Timestamp.Subtract(ts0),
                () => action(p.Value)));

This uses the System.Interactive extension ForEach, but you could just use a regular foreach loop to load up the scheduler.

I've tested the code with the following dummy data:

var pairs = new []
{
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};

I hope this helps.

like image 2
Enigmativity Avatar answered Nov 14 '22 16:11

Enigmativity