Throttle
method skips values from an observable sequence if others follow too quickly. But I need a method to just delay them. That is, I need to set a minimum delay between items, without skipping any.
Practical example: there's a web service which can accept requests no faster than once a second; there's a user who can add requests, single or in batches. Without Rx, I'll create a list and a timer. When users adds requests, I'll add them to the list. In the timer event, I'll check wether the list is empty. If it is not, I'll send a request and remove the corresponding item. With locks and all that stuff. Now, with Rx, I can create Subject
, add items when users adds requests. But I need a way to make sure the web service is not flooded by applying delays.
I'm new to Rx, so maybe I'm missing something obvious.
There's a fairly easy way to do what you want using an EventLoopScheduler
.
I started out with an observable that will randomly produce values once every 0 to 3 seconds.
var rnd = new Random();
var xs =
Observable
.Generate(
0,
x => x < 20,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));
Now, to make this output values immediately unless the last value was within a second ago I did this:
var ys =
Observable.Create<int>(o =>
{
var els = new EventLoopScheduler();
return xs
.ObserveOn(els)
.Do(x => els.Schedule(() => Thread.Sleep(1000)))
.Subscribe(o);
});
This effectively observes the source on the EventLoopScheduler
and then puts it to sleep for 1 second after each OnNext
so that it can only begin the next OnNext
after it wakes up.
I tested that it worked with this code:
ys
.Timestamp()
.Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
.Subscribe(x => Console.WriteLine(x));
I hope this helps.
How about a simple extension method:
public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}
Usage:
var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
I want to suggest an approach with using Observable.Zip
:
// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();
// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1));
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
Console.WriteLine($"Request received: {number}");
});
// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));
// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});
requestsToProcess.Subscribe((number) =>
{
Console.WriteLine($"Request processed: {number}");
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With