Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions: Throttle/Sample with varying interval

I have an IObservable that produces values at random intervals and I want to throttle this sequence. One thing I have found out is that the Throttle operator's definition of "throttling" is not the same as mine.

Throttle only produces values after the specified interval elapses with silence (it produces the last value seen). I thought throttling would mean producing values at the specified interval (unless there's silence, of course).

Say, I expected Observable.Interval(100).Select((_,i) => i).Throttle(200) to produce (modulo any performance/timing issues) the even numbers, since I am throttling it to "half-speed". However that sequence produces no value at all, because there's never a period of silence of length 200.

So, I discovered that Sample actually does the "throttling" behavior I want. Observable.Interval(100).Select((_,i) => i).Sample(200) produces (again, modulo any performance/timing issues) the sequence of even numbers.

However, I have one other problem: the interval varies, depending on the last "sampled" value. What I want is to write an operator that looks like this:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);

The intervalSelector parameter produces the interval for the next sample, and the first sample... is either taken at the first value or from an additional parameter, I don't care.

I tried writing this but I ended up with a large convoluted construction that did not work quite right. My question is, can I build this using the existing operators (aka, with a one-liner)?

like image 481
R. Martinho Fernandes Avatar asked Aug 16 '10 03:08

R. Martinho Fernandes


1 Answers

Many hours later, and with some sleep on it, I got it.

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
    return source.TimeInterval()
                 .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
                 {
                     if(v.Interval >= acc.Item1)
                     {
                         return Tuple.Create(intervalSelector(v.Value), true, v.Value);
                     }
                     return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
                 })
                 .Where(t => t.Item2)
                 .Select(x => x.Item3);
}

This works as I want: each time it produces a value x, it stops producing values until intervalSelector(x) time passes.

like image 191
R. Martinho Fernandes Avatar answered Sep 18 '22 07:09

R. Martinho Fernandes