Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Forward Rx items with a cooldown, switching to sampling when they come too fast

I'm looking for the Rx method that will take an observable and put the latest item on a 'cooldown', so that when items are coming in slower than the cooldown they're just forwarded but when they're coming in faster you just get the latest value after each cooldown period.

Said a different way, I want to switch to sampling with period t when items are separated by less than t time (and switch back when they're spread out).

This is really similar to what Observable.Throttle does, except that the timer is not reset whenever a new item arrives.

The application I have in mind is for sending 'latest value' updates across the network. I don't want to communicate a value unless it has changed, and I don't want to spam a rapidly changing value so much that I swamp out other data.

Is there a standard method that does what I need?

like image 908
Craig Gidney Avatar asked Feb 14 '23 08:02

Craig Gidney


2 Answers

Strilanc, given your concern about unwanted activity when the source stream is quiet, you might be interested in this method of pacing events - I wasn't going to add this otherwise, as I think J. Lennon's implementation is perfectly reasonable (and much simpler), and the performance of the timer isn't going to hurt.

There is one other interesting difference in this implementation - it differs from the Sample approach because it emits events occurring outside the cooldown period immediately rather than at the next sampling interval. It maintains no timer outside the cooldown.

EDIT - Here is v3 solving the issue Chris mentioned in the comments - it ensures that changes occurring during the cool-down themselves trigger a new cool-down period.

    public static IObservable<T> LimitRate<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        return source.DistinctUntilChanged()
                     .GroupByUntil(k => 0,
                                   g => Observable.Timer(duration, scheduler))
            .SelectMany(x => x.FirstAsync()
                              .Merge(x.Skip(1)
                                      .TakeLast(1)))
                              .Select(x => Observable.Return(x)
                                .Concat(Observable.Empty<T>()
                                    .Delay(duration, scheduler)))
                                    .Concat();
    }

This works by initially using a GroupByUntil to pack all events into the same group for the duration of the cool-down period. It watches for changes and emits the final change (if any) as the group expires.

Then the resulting events are projected into a streams whose OnCompleted is delayed by the cool-down period. These streams are then concatenated together. This prevents events being any closer together than the cool-down, but otherwise they are emitted as soon as possible.

Here are the unit tests (updated for v3 edit), which you can run using nuget packages rx-testing and nunit:

public class LimitRateTests : ReactiveTest
{
    [Test]
    public void SlowerThanRateIsUnchanged()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(400, 2),
            OnNext(700, 3));
    }

    [Test]
    public void FasterThanRateIsSampled()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(140, 5),
            OnNext(150, 2),
            OnNext(300, 3),
            OnNext(350, 4));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));
    }

    [Test]
    public void DuplicatesAreOmitted()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(300, 1),
            OnNext(350, 1));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1));
    }

    [Test]
    public void CoolResetsCorrectly()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 2),
            OnNext(205, 3));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3));
    }

    [Test]
    public void MixedPacingWorks()
    {
        var scheduler = new TestScheduler();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(150, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(825, 5));

        var results = scheduler.CreateObserver<int>();

        source.LimitRate(TimeSpan.FromTicks(100), scheduler).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(450, 3),
            OnNext(750, 4),
            OnNext(850, 5));
    }
}
like image 101
James World Avatar answered Feb 16 '23 22:02

James World


You can use the Observable.DistinctUntilChanged and Observable.Sample.

Observable.DistinctUntilChanged

This method will surface values only if they are different from the previous value. (http://www.introtorx.com/content/v1.0.10621.0/05_Filtering.html)

Observable.Sample

The Sample method simply takes the last value for every specified TimeSpan. (http://www.introtorx.com/content/v1.0.10621.0/13_TimeShiftedSequences.html#Sample)

To generate the desired effect, you can combine the first item generated with those described above.

like image 26
J. Lennon Avatar answered Feb 16 '23 22:02

J. Lennon