I'm looking for the Rx method that will take an observable and put the latest item on a 'cooldown', so that when items are coming in slower than the cooldown they're just forwarded but when they're coming in faster you just get the latest value after each cooldown period.
Said a different way, I want to switch to sampling with period t
when items are separated by less than t
time (and switch back when they're spread out).
This is really similar to what Observable.Throttle does, except that the timer is not reset whenever a new item arrives.
The application I have in mind is for sending 'latest value' updates across the network. I don't want to communicate a value unless it has changed, and I don't want to spam a rapidly changing value so much that I swamp out other data.
Is there a standard method that does what I need?
Strilanc, given your concern about unwanted activity when the source stream is quiet, you might be interested in this method of pacing events - I wasn't going to add this otherwise, as I think J. Lennon's implementation is perfectly reasonable (and much simpler), and the performance of the timer isn't going to hurt.
There is one other interesting difference in this implementation - it differs from the Sample
approach because it emits events occurring outside the cooldown period immediately rather than at the next sampling interval. It maintains no timer outside the cooldown.
EDIT - Here is v3 solving the issue Chris mentioned in the comments - it ensures that changes occurring during the cool-down themselves trigger a new cool-down period.
public static IObservable<T> LimitRate<T>(
this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
return source.DistinctUntilChanged()
.GroupByUntil(k => 0,
g => Observable.Timer(duration, scheduler))
.SelectMany(x => x.FirstAsync()
.Merge(x.Skip(1)
.TakeLast(1)))
.Select(x => Observable.Return(x)
.Concat(Observable.Empty<T>()
.Delay(duration, scheduler)))
.Concat();
}
This works by initially using a GroupByUntil
to pack all events into the same group for the duration of the cool-down period. It watches for changes and emits the final change (if any) as the group expires.
Then the resulting events are projected into a streams whose OnCompleted is delayed by the cool-down period. These streams are then concatenated together. This prevents events being any closer together than the cool-down, but otherwise they are emitted as soon as possible.
Here are the unit tests (updated for v3 edit), which you can run using nuget packages rx-testing
and nunit
:
public class LimitRateTests : ReactiveTest
{
[Test]
public void SlowerThanRateIsUnchanged()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(200, 1),
OnNext(400, 2),
OnNext(700, 3));
}
[Test]
public void FasterThanRateIsSampled()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(140, 5),
OnNext(150, 2),
OnNext(300, 3),
OnNext(350, 4));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
}
[Test]
public void DuplicatesAreOmitted()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(300, 1),
OnNext(350, 1));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1));
}
[Test]
public void CoolResetsCorrectly()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 2),
OnNext(205, 3));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3));
}
[Test]
public void MixedPacingWorks()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(150, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(825, 5));
var results = scheduler.CreateObserver<int>();
source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(450, 3),
OnNext(750, 4),
OnNext(850, 5));
}
}
You can use the Observable.DistinctUntilChanged
and Observable.Sample
.
Observable.DistinctUntilChanged
This method will surface values only if they are different from the previous value. (http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html)
Observable.Sample
The Sample method simply takes the last value for every specified TimeSpan. (http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample)
To generate the desired effect, you can combine the first item generated with those described above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With