IObservable filteredStream = changes.Buffer(3);
But how to introduce a timeout TimeSpan tooLong
so that the counting would restart from zero whenever the interval between two values exceeds this maximum?
I think this is what you're after.
var longGap = source.Throttle(tooLong);
var filtered = source
.Window(() => { return longGap; }) // Gives a window between every longGap
.Select(io => io.Buffer(maxItems).Where(l => l.Count == maxItems))
.Switch(); // Flattens the IObservable<IObservable<IList>> to IObservable<IList>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With