I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.
The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.
Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):
class Program { static Stopwatch sw = new Stopwatch(); static void Main(string[] args) { var subject = new Subject<int>(); var timeout = TimeSpan.FromMilliseconds(500); subject .Throttle(timeout) .Subscribe(DoStuff); var factory = new TaskFactory(); sw.Start(); factory.StartNew(() => { Console.WriteLine("Batch 1 (no delay)"); subject.OnNext(1); }); factory.StartNewDelayed(1000, () => { Console.WriteLine("Batch 2 (1s delay)"); subject.OnNext(2); }); factory.StartNewDelayed(1300, () => { Console.WriteLine("Batch 3 (1.3s delay)"); subject.OnNext(3); }); factory.StartNewDelayed(1600, () => { Console.WriteLine("Batch 4 (1.6s delay)"); subject.OnNext(4); }); Console.ReadKey(); sw.Stop(); } private static void DoStuff(int i) { Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds); } }
The output of running this right now is:
Batch 1 (no delay)
Handling 1 at 508ms
Batch 2 (1s delay)
Batch 3 (1.3s delay)
Batch 4 (1.6s delay)
Handling 4 at 2114ms
Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.
What I'm looking for is something more like this:
Batch 1 (no delay)
Handling 1 at ~0ms
Batch 2 (1s delay)
Handling 2 at ~1000s
Batch 3 (1.3s delay)
Batch 4 (1.6s delay)
Handling 4 at ~1600s
Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.
EDIT:
Here is the implementation for the "StartNewDelayed" extension method that I use:
/// <summary>Creates a Task that will complete after the specified delay.</summary> /// <param name="factory">The TaskFactory.</param> /// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param> /// <returns>A Task that will be completed after the specified duration.</returns> public static Task StartNewDelayed( this TaskFactory factory, int millisecondsDelay) { return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None); } /// <summary>Creates a Task that will complete after the specified delay.</summary> /// <param name="factory">The TaskFactory.</param> /// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param> /// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param> /// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns> public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken) { // Validate arguments if (factory == null) throw new ArgumentNullException("factory"); if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay"); // Create the timed task var tcs = new TaskCompletionSource<object>(factory.CreationOptions); var ctr = default(CancellationTokenRegistration); // Create the timer but don't start it yet. If we start it now, // it might fire before ctr has been set to the right registration. var timer = new Timer(self => { // Clean up both the cancellation token and the timer, and try to transition to completed ctr.Dispose(); ((Timer)self).Dispose(); tcs.TrySetResult(null); }); // Register with the cancellation token. if (cancellationToken.CanBeCanceled) { // When cancellation occurs, cancel the timer and try to transition to cancelled. // There could be a race, but it's benign. ctr = cancellationToken.Register(() => { timer.Dispose(); tcs.TrySetCanceled(); }); } if (millisecondsDelay > 0) { // Start the timer and hand back the task... timer.Change(millisecondsDelay, Timeout.Infinite); } else { // Just complete the task, and keep execution on the current thread. ctr.Dispose(); tcs.TrySetResult(null); timer.Dispose(); } return tcs.Task; }
Here's my approach. It's similar to others that have gone before, but it doesn't suffer the over-zealous window production problem.
The desired function works a lot like Observable.Throttle
but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed.
Given as a testable extension method:
public static class ObservableExtensions { public static IObservable<T> SampleFirst<T>( this IObservable<T> source, TimeSpan sampleDuration, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return source.Publish(ps => ps.Window(() => ps.Delay(sampleDuration,scheduler)) .SelectMany(x => x.Take(1))); } }
The idea is to use the overload of Window
that creates non-overlapping windows using a windowClosingSelector
that uses the source time-shifted back by the sampleDuration
. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.
The Publish
extension method used above is not available in Rx 1.x. Here is an alternative:
public static class ObservableExtensions { public static IObservable<T> SampleFirst<T>( this IObservable<T> source, TimeSpan sampleDuration, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; var sourcePub = source.Publish().RefCount(); return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler)) .SelectMany(x => x.Take(1)); } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With