Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best way to "rate limit" consuming of an Observable?

I have a bunch of events coming in and I have to execute ALL of them without a loss, but I want to make sure that they are buffered and consumed at the appropriate time slots. Anyone have a solution?

I can't find any operators in Rx that can do that without the loss of the events (Throttle - looses events). I've also considered Buffered, Delay, etc... Can't find a good solution.

I've tried to put a timer in the middle, but somehow it doesn't work at all:

GetInitSequence()
            .IntervalThrottle(TimeSpan.FromSeconds(5))
            .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }
like image 578
IgorM Avatar asked Jul 01 '12 17:07

IgorM


1 Answers

The question is not 100% clear so I'm making some presumptions.

Observable.Delay is not what you want because that will create a delay from when each event arrives, rather than creating even time intervals for processing.

Observable.Buffer is not what you want because that will cause all events in each given interval to be passed to you, rather than one at a time.

So I believe you're looking for a solution that creates some sort of metronome that ticks away, and gives you an event per tick. This can be naively constructed using Observable.Interval for the metronome and Zip for connecting it to your source:

var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));    
var triggeredSource = source.Zip(trigger, (s,_) => s); 
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));

This will trigger every 5 seconds (in the example above), and give you the original items in sequence.

The only problem with this solution is that if you don't have any more source elements for (say) 10 seconds, when the source elements arrive they will be immediately sent out since some of the 'trigger' events are sitting there waiting for them. Marble diagram for that scenario:

source:  -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result:  ----a----b----c-------------d-e-f-g

This is a very reasonable issue. There are two questions here already that tackle it:

Rx IObservable buffering to smooth out bursts of events

A way to push buffered events in even intervals

The solution provided is a main Drain extension method and secondary Buffered extension. I've modified these to be far simpler (no need for Drain, just use Concat). Usage is:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));

The extension method StepInterval:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}
like image 117
yamen Avatar answered Sep 24 '22 22:09

yamen