Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

reset count above max time interval, in Rx count-based aggregation

Count-based filtering without a time constraint

IObservable filteredStream = changes.Buffer(3);

How to introduce inactivity reset?

enter image description here

But how to introduce a timeout TimeSpan tooLong so that the counting would restart from zero whenever the interval between two values exceeds this maximum?

like image 561
Cel Avatar asked Mar 04 '12 12:03

Cel


1 Answers

I think this is what you're after.

var longGap = source.Throttle(tooLong);
var filtered = source
  .Window(() => { return longGap; })  // Gives a window between every longGap
  .Select(io => io.Buffer(maxItems).Where(l => l.Count == maxItems))
  .Switch();  // Flattens the IObservable<IObservable<IList>> to IObservable<IList>
like image 93
Matthew Finlay Avatar answered Nov 09 '22 22:11

Matthew Finlay