Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Throttle returning all items added within the TimeSpan

Given an IObservable<T> is there a way to use Throttle behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?

Buffer provides a similar functionality it that it chunks the data up into IList<T> on every time span or count. But I need that time to reset each time an item is added.

I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.

like image 534
RichK Avatar asked Jan 13 '12 11:01

RichK


1 Answers

I amended Colonel Panic's BufferUntilInactive operator by adding a Publish component, so that it works correctly with cold observables too:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

For completeness I've also added an optional IScheduler parameter, which configures the scheduler where the timer is run.

like image 172
Theodor Zoulias Avatar answered Sep 28 '22 23:09

Theodor Zoulias