I have an IObservable
that produces values at random intervals and I want to throttle this sequence. One thing I have found out is that the Throttle
operator's definition of "throttling" is not the same as mine.
Throttle
only produces values after the specified interval elapses with silence (it produces the last value seen). I thought throttling would mean producing values at the specified interval (unless there's silence, of course).
Say, I expected Observable.Interval(100).Select((_,i) => i).Throttle(200)
to produce (modulo any performance/timing issues) the even numbers, since I am throttling it to "half-speed". However that sequence produces no value at all, because there's never a period of silence of length 200.
So, I discovered that Sample
actually does the "throttling" behavior I want. Observable.Interval(100).Select((_,i) => i).Sample(200)
produces (again, modulo any performance/timing issues) the sequence of even numbers.
However, I have one other problem: the interval varies, depending on the last "sampled" value. What I want is to write an operator that looks like this:
public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);
The intervalSelector
parameter produces the interval for the next sample, and the first sample... is either taken at the first value or from an additional parameter, I don't care.
I tried writing this but I ended up with a large convoluted construction that did not work quite right. My question is, can I build this using the existing operators (aka, with a one-liner)?
Many hours later, and with some sleep on it, I got it.
public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
return source.TimeInterval()
.Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
{
if(v.Interval >= acc.Item1)
{
return Tuple.Create(intervalSelector(v.Value), true, v.Value);
}
return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
})
.Where(t => t.Item2)
.Select(x => x.Item3);
}
This works as I want: each time it produces a value x
, it stops producing values until intervalSelector(x)
time passes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With