Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to throttle event stream using RX?

I want to effectively throttle an event stream, so that my delegate is called when the first event is received but then not for 1 second if subsequent events are received. After expiry of that timeout (1 second), if a subsequent event was received I want my delegate to be called.

Is there a simple way to do this using Reactive Extensions?

Sample code:

static void Main(string[] args) {     Console.WriteLine("Running...");      var generator = Observable         .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)         .Timestamp();      var builder = new StringBuilder();      generator         .Sample(TimeSpan.FromSeconds(1))         .Finally(() => Console.WriteLine(builder.ToString()))         .Subscribe(feed =>                    builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",                                                     feed.Value,                                                     feed.Timestamp.ToString("mm:ss.fff"),                                                     DateTime.Now.ToString("mm:ss.fff"))));      Console.ReadKey(); } 

Current output:

Running... Observed 064, generated at 41:43.602, observed at 41:43.602 Observed 100, generated at 41:44.165, observed at 41:44.602 

But I want to observe (timestamps obviously will change)

Running... Observed 001, generated at 41:43.602, observed at 41:43.602 .... Observed 100, generated at 41:44.165, observed at 41:44.602 
like image 663
Alex Avatar asked Jul 09 '10 08:07

Alex


2 Answers

Okay,

you have 3 scenarios here:

1) I would like to get one value of the event stream every second. means: that if it produces more events per second, you will get a always bigger buffer.

observableStream.Throttle(timeSpan) 

2) I would like to get the latest event, that was produced before the second happens means: other events get dropped.

observableStream.Sample(TimeSpan.FromSeconds(1)) 

3) you would like to get all events, that happened in the last second. and that every second

observableStream.BufferWithTime(timeSpan) 

4) you want to select what happens in between the second with all the values, till the second has passed, and your result is returned

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent) 
like image 119
cRichter Avatar answered Sep 20 '22 20:09

cRichter


Here's is what I got with some help from the RX Forum:

The idea is to issue a series of "tickets" for the original sequence to fire. These "tickets" are delayed for the timeout, excluding the very first one, which is immediately pre-pended to the ticket sequence. When an event comes in and there is a ticket waiting, the event fires immediately, otherwise it waits till the ticket and then fires. When it fires, the next ticket is issued, and so on...

To combine the tickets and original events, we need a combinator. Unfortunately, the "standard" .CombineLatest cannot be used here because it would fire on tickets and events that were used previousely. So I had to create my own combinator, which is basically a filtered .CombineLatest, that fires only when both elements in the combination are "fresh" - were never returned before. I call it .CombineVeryLatest aka .BrokenZip ;)

Using .CombineVeryLatest, the above idea can be implemented as such:

    public static IObservable<T> SampleResponsive<T>(         this IObservable<T> source, TimeSpan delay)     {         return source.Publish(src =>         {             var fire = new Subject<T>();              var whenCanFire = fire                 .Select(u => new Unit())                 .Delay(delay)                 .StartWith(new Unit());              var subscription = src                 .CombineVeryLatest(whenCanFire, (x, flag) => x)                 .Subscribe(fire);              return fire.Finally(subscription.Dispose);         });     }      public static IObservable<TResult> CombineVeryLatest         <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,         IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector)     {         var ls = leftSource.Select(x => new Used<TLeft>(x));         var rs = rightSource.Select(x => new Used<TRight>(x));         var cmb = ls.CombineLatest(rs, (x, y) => new { x, y });         var fltCmb = cmb             .Where(a => !(a.x.IsUsed || a.y.IsUsed))             .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; });         return fltCmb.Select(a => selector(a.x.Value, a.y.Value));     }      private class Used<T>     {         internal T Value { get; private set; }         internal bool IsUsed { get; set; }          internal Used(T value)         {             Value = value;         }     } 

Edit: here's another more compact variation of CombineVeryLatest proposed by Andreas Köpf on the forum:

public static IObservable<TResult> CombineVeryLatest   <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource,   IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {     return Observable.Defer(() =>     {         int l = -1, r = -1;         return Observable.CombineLatest(             leftSource.Select(Tuple.Create<TLeft, int>),             rightSource.Select(Tuple.Create<TRight, int>),                 (x, y) => new { x, y })             .Where(t => t.x.Item2 != l && t.y.Item2 != r)             .Do(t => { l = t.x.Item2; r = t.y.Item2; })             .Select(t => selector(t.x.Item1, t.y.Item1));     }); } 
like image 24
Sergey Aldoukhov Avatar answered Sep 22 '22 20:09

Sergey Aldoukhov