Given an IObservable<T>
is there a way to use Throttle
behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?
Buffer
provides a similar functionality it that it chunks the data up into IList<T>
on every time span or count. But I need that time to reset each time an item is added.
I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.
I amended Colonel Panic's BufferUntilInactive
operator by adding a Publish
component, so that it works correctly with cold observables too:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
scheduler ??= Scheduler.Default;
return source.Publish(published =>
published
.Window(() => published.Throttle(dueTime, scheduler))
.SelectMany(window => window.ToList())
);
}
For completeness I've also added an optional IScheduler
parameter, which configures the scheduler where the timer is run.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With