Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx: How can I respond immediately, and throttle subsequent requests

I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.

The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.

Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):

class Program {     static Stopwatch sw = new Stopwatch();      static void Main(string[] args)     {         var subject = new Subject<int>();         var timeout = TimeSpan.FromMilliseconds(500);          subject             .Throttle(timeout)             .Subscribe(DoStuff);          var factory = new TaskFactory();                   sw.Start();          factory.StartNew(() =>         {             Console.WriteLine("Batch 1 (no delay)");             subject.OnNext(1);         });          factory.StartNewDelayed(1000, () =>         {             Console.WriteLine("Batch 2 (1s delay)");             subject.OnNext(2);         });           factory.StartNewDelayed(1300, () =>         {             Console.WriteLine("Batch 3 (1.3s delay)");             subject.OnNext(3);         });          factory.StartNewDelayed(1600, () =>         {             Console.WriteLine("Batch 4 (1.6s delay)");             subject.OnNext(4);         });          Console.ReadKey();         sw.Stop();     }      private static void DoStuff(int i)     {         Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);     } } 

The output of running this right now is:

Batch 1 (no delay)

Handling 1 at 508ms

Batch 2 (1s delay)

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at 2114ms

Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.

What I'm looking for is something more like this:

Batch 1 (no delay)

Handling 1 at ~0ms

Batch 2 (1s delay)

Handling 2 at ~1000s

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at ~1600s

Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.

EDIT:

Here is the implementation for the "StartNewDelayed" extension method that I use:

/// <summary>Creates a Task that will complete after the specified delay.</summary> /// <param name="factory">The TaskFactory.</param> /// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param> /// <returns>A Task that will be completed after the specified duration.</returns> public static Task StartNewDelayed(     this TaskFactory factory, int millisecondsDelay) {     return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None); }  /// <summary>Creates a Task that will complete after the specified delay.</summary> /// <param name="factory">The TaskFactory.</param> /// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param> /// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param> /// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns> public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken) {     // Validate arguments     if (factory == null) throw new ArgumentNullException("factory");     if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");      // Create the timed task     var tcs = new TaskCompletionSource<object>(factory.CreationOptions);     var ctr = default(CancellationTokenRegistration);      // Create the timer but don't start it yet.  If we start it now,     // it might fire before ctr has been set to the right registration.     var timer = new Timer(self =>     {         // Clean up both the cancellation token and the timer, and try to transition to completed         ctr.Dispose();         ((Timer)self).Dispose();         tcs.TrySetResult(null);     });      // Register with the cancellation token.     if (cancellationToken.CanBeCanceled)     {         // When cancellation occurs, cancel the timer and try to transition to cancelled.         // There could be a race, but it's benign.         ctr = cancellationToken.Register(() =>         {             timer.Dispose();             tcs.TrySetCanceled();         });     }      if (millisecondsDelay > 0)     {         // Start the timer and hand back the task...         timer.Change(millisecondsDelay, Timeout.Infinite);     }     else     {         // Just complete the task, and keep execution on the current thread.         ctr.Dispose();         tcs.TrySetResult(null);         timer.Dispose();     }      return tcs.Task; } 
like image 483
Andrew Anderson Avatar asked Nov 03 '11 17:11

Andrew Anderson


1 Answers

Here's my approach. It's similar to others that have gone before, but it doesn't suffer the over-zealous window production problem.

The desired function works a lot like Observable.Throttle but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed.

Given as a testable extension method:

public static class ObservableExtensions {     public static IObservable<T> SampleFirst<T>(         this IObservable<T> source,         TimeSpan sampleDuration,         IScheduler scheduler = null)     {         scheduler = scheduler ?? Scheduler.Default;         return source.Publish(ps =>              ps.Window(() => ps.Delay(sampleDuration,scheduler))               .SelectMany(x => x.Take(1)));     } } 

The idea is to use the overload of Window that creates non-overlapping windows using a windowClosingSelector that uses the source time-shifted back by the sampleDuration. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.

Rx 1.x Version

The Publish extension method used above is not available in Rx 1.x. Here is an alternative:

public static class ObservableExtensions {     public static IObservable<T> SampleFirst<T>(         this IObservable<T> source,         TimeSpan sampleDuration,         IScheduler scheduler = null)     {         scheduler = scheduler ?? Scheduler.Default;         var sourcePub = source.Publish().RefCount();         return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))                         .SelectMany(x => x.Take(1));     } } 
like image 126
James World Avatar answered Sep 18 '22 21:09

James World