Use case: I'm writing a thing that monitors changes and saves automatically. I want to Throttle so that I don't save more often than every five seconds. I want to save every 30 seconds if there is a continuous stream of changes.
Could not find observable.Throttle(mergeTime, maxTime) in the docs and could only think of ugly ways of writing my own so hence this question.
Here's a way to do it using GroupByUntil
:
public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
return source
.GroupByUntil(
t => 0, // they all get the same key
t => t, // the element is the element
g =>
{
// expire the group when it slows down for throttle
// or when it exceeds maxTime
return g
.Throttle(throttle, scheduler ?? Scheduler.Default)
.Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default);
})
.SelectMany(g => g.LastAsync());
}
And here's a way to do it using Window
:
public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
return source.Publish(p => p
.Window(() =>
{
// close the window when p slows down for throttle
// or when it exceeds maxTime.
// do not start throttling or the maxTime timer
// until the first p of the new window arrives
var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default);
var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default);
// signal when either timer signals
return throttleTimer.Amb(timeoutTimer);
})
.SelectMany(w => w.TakeLast(1)));
}
Here is an interactive marble diagram (drag the input marbles around):
Examples["throttleWithMax"] = {
category: "Custom",
label: "throttleWithMax(5, 10)",
inputs: [
[1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {
return {
d: i,
t: i
};
}).concat([55])
],
apply: function(inputs, scheduler, Rx) {
Rx.Observable.prototype.throttleWithMax = function(throttle, maxTime, scheduler) {
var s = scheduler || Rx.Scheduler.timeout;
return this
.publish(function(p) {
return p
.window(function() {
var throttleTimer = p.debounce(throttle, s);
var timeoutTimer = p.delay(maxTime, s);
return Rx.Observable.amb(throttleTimer, timeoutTimer);
})
.flatMap(function(w) {
return w.takeLast(1);
});
});
};
return inputs[0].throttleWithMax(5, 10, scheduler);
}
};
var d = document.createElement("div");
document.body.appendChild(d);
d.innerHTML = '<rx-marbles key="throttleWithMax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script>
<!--[if lt IE 7]>
<p class="browsehappy">You are using an <strong>outdated</strong> browser. Please <a href="http://browsehappy.com/">upgrade your browser</a> to improve your experience.</p>
<![endif]-->
And here is a unit test that uses TestScheduler
to control the clock and take the randomness of the system clock out of it:
private const int _THROTTLE = 50;
private const int _TIMEOUT = 100;
private const int _COMPLETE = 100000;
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")]
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")]
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")]
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")]
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")]
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")]
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes)
{
var scheduler = new TestScheduler();
var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) };
var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray());
var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler);
var observer = scheduler.CreateObserver<int>();
throttled.Subscribe(observer);
// start the clock
scheduler.Start();
// check the results
var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList();
CollectionAssert.AreEqual(expected, observer.Messages);
}
Here's the complete unit test code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With