Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttle Rx.Observable without skipping values

Throttle method skips values from an observable sequence if others follow too quickly. But I need a method to just delay them. That is, I need to set a minimum delay between items, without skipping any.

Practical example: there's a web service which can accept requests no faster than once a second; there's a user who can add requests, single or in batches. Without Rx, I'll create a list and a timer. When users adds requests, I'll add them to the list. In the timer event, I'll check wether the list is empty. If it is not, I'll send a request and remove the corresponding item. With locks and all that stuff. Now, with Rx, I can create Subject, add items when users adds requests. But I need a way to make sure the web service is not flooded by applying delays.

I'm new to Rx, so maybe I'm missing something obvious.

like image 464
Athari Avatar asked Jul 25 '12 16:07

Athari


3 Answers

There's a fairly easy way to do what you want using an EventLoopScheduler.

I started out with an observable that will randomly produce values once every 0 to 3 seconds.

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));

Now, to make this output values immediately unless the last value was within a second ago I did this:

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });

This effectively observes the source on the EventLoopScheduler and then puts it to sleep for 1 second after each OnNext so that it can only begin the next OnNext after it wakes up.

I tested that it worked with this code:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));

I hope this helps.

like image 110
Enigmativity Avatar answered Nov 17 '22 22:11

Enigmativity


How about a simple extension method:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

Usage:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
like image 5
yamen Avatar answered Nov 18 '22 00:11

yamen


I want to suggest an approach with using Observable.Zip:

// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();

// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
    Console.WriteLine($"Request received: {number}");
});

// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));

// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});

requestsToProcess.Subscribe((number) =>
{
    Console.WriteLine($"Request processed: {number}");
});
like image 2
Varvara Kalinina Avatar answered Nov 18 '22 00:11

Varvara Kalinina