Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there something like ThrottleOrMax in rx?

Use case: I'm writing a thing that monitors changes and saves automatically. I want to Throttle so that I don't save more often than every five seconds. I want to save every 30 seconds if there is a continuous stream of changes.

Could not find observable.Throttle(mergeTime, maxTime) in the docs and could only think of ugly ways of writing my own so hence this question.

like image 618
Johan Larsson Avatar asked Feb 09 '23 21:02

Johan Larsson


1 Answers

Here's a way to do it using GroupByUntil:

public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
    return source
        .GroupByUntil(
            t => 0, // they all get the same key
            t => t, // the element is the element
            g =>
            {
                // expire the group when it slows down for throttle
                // or when it exceeds maxTime
                return g
                    .Throttle(throttle, scheduler ?? Scheduler.Default)
                    .Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default);
            })
        .SelectMany(g => g.LastAsync());
}

And here's a way to do it using Window:

public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
    return source.Publish(p => p
            .Window(() =>
            {
                // close the window when p slows down for throttle
                // or when it exceeds maxTime.
                // do not start throttling or the maxTime timer
                // until the first p of the new window arrives
                var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default);
                var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default);
                // signal when either timer signals
                return throttleTimer.Amb(timeoutTimer);
            })
            .SelectMany(w => w.TakeLast(1)));
}

Here is an interactive marble diagram (drag the input marbles around):

Examples["throttleWithMax"] = {
  category: "Custom",
  label: "throttleWithMax(5, 10)",
  inputs: [
    [1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {
      return {
        d: i,
        t: i
      };
    }).concat([55])
  ],
  apply: function(inputs, scheduler, Rx) {
    Rx.Observable.prototype.throttleWithMax = function(throttle, maxTime, scheduler) {
      var s = scheduler || Rx.Scheduler.timeout;
      return this
        .publish(function(p) {
          return p
            .window(function() {
              var throttleTimer = p.debounce(throttle, s);
              var timeoutTimer = p.delay(maxTime, s);
              return Rx.Observable.amb(throttleTimer, timeoutTimer);
            })
            .flatMap(function(w) {
              return w.takeLast(1);
            });
        });
    };

    return inputs[0].throttleWithMax(5, 10, scheduler);
  }
};

var d = document.createElement("div");
document.body.appendChild(d);
d.innerHTML = '<rx-marbles key="throttleWithMax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script>
<!--[if lt IE 7]>
  <p class="browsehappy">You are using an <strong>outdated</strong> browser. Please <a href="http://browsehappy.com/">upgrade your browser</a> to improve your experience.</p>
<![endif]-->

And here is a unit test that uses TestScheduler to control the clock and take the randomness of the system clock out of it:

private const int _THROTTLE = 50;
private const int _TIMEOUT = 100;
private const int _COMPLETE = 100000;
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")]
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")]
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")]
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")]
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")]
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")]
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes)
{
    var scheduler = new TestScheduler();
    var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) };
    var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray());
    var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler);
    var observer = scheduler.CreateObserver<int>();
    throttled.Subscribe(observer);

    // start the clock
    scheduler.Start();

    // check the results
    var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList();
    CollectionAssert.AreEqual(expected, observer.Messages);
}

Here's the complete unit test code.

like image 100
Brandon Avatar answered Feb 12 '23 09:02

Brandon