Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I rate limit requests losslessly using RxJS 5

I would like to use make a series of requests to a server, but the server has a hard rate limit of 10 request per second. If I try to make the requests in a loop, it will hit the rate limit since all the requests will happen at the same time.

for(let i = 0; i < 20; i++) {
  sendRequest();
}

ReactiveX has lots of tools for modifying observable streams, but I can't seem to find the tools to implement rate limiting. I tried adding a standard delay, but the requests still fire at the same time, just 100ms later than they did previously.

const queueRequest$ = new Rx.Subject<number>();

queueRequest$
  .delay(100)
  .subscribe(queueData => {
    console.log(queueData);
  });

const queueRequest = (id) => queueRequest$.next(id);

function fire20Requests() {
  for (let i=0; i<20; i++) {
    queueRequest(i);
  }
}

fire20Requests();
setTimeout(fire20Requests, 1000);
setTimeout(fire20Requests, 5000);

The debounceTime and throttleTime operators are similar to what I'm looking for as well, but that is lossy instead of lossless. I want to preserve every request that I make, instead of discarding the earlier ones.

...
queueRequest$
  .debounceTime(100)
  .subscribe(queueData => {
    sendRequest();
  });
...

How do I make these requests to the server without exceeding the rate limit using ReactiveX and Observables?

like image 860
Adam Avatar asked Feb 16 '17 20:02

Adam


2 Answers

The implementation in the OP's self answer (and in the linked blog) always imposes a delay which is less than ideal.

If the rate-limited service allows for 10 requests per second, it should be possible to make 10 requests in, say, 10 milliseconds, as long as the next request is not made for another 990 milliseconds.

The implementation below applies a variable delay to ensure the limit is enforced and the delay is only applied to requests that would see the limit exceeded.

function rateLimit(source, count, period) {

  return source
    .scan((records, value) => {

      const now = Date.now();
      const since = now - period;

      // Keep a record of all values received within the last period.

      records = records.filter((record) => record.until > since);
      if (records.length >= count) {

        // until is the time until which the value should be delayed.

        const firstRecord = records[0];
        const lastRecord = records[records.length - 1];
        const until = firstRecord.until + (period * Math.floor(records.length / count));

        // concatMap is used below to guarantee the values are emitted
        // in the same order in which they are received, so the delays
        // are cumulative. That means the actual delay is the difference
        // between the until times.

        records.push({
          delay: (lastRecord.until < now) ?
            (until - now) :
            (until - lastRecord.until),
          until,
          value
        });
      } else {
        records.push({
          delay: 0,
          until: now,
          value
        });
      }
      return records;

    }, [])
    .concatMap((records) => {

      const lastRecord = records[records.length - 1];
      const observable = Rx.Observable.of(lastRecord.value);
      return lastRecord.delay ? observable.delay(lastRecord.delay) : observable;
    });
}

const start = Date.now();
rateLimit(
  Rx.Observable.range(1, 30),
  10,
  1000
).subscribe((value) => console.log(`${value} at T+${Date.now() - start}`));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
like image 125
cartant Avatar answered Nov 16 '22 03:11

cartant


This blog post does a great job of explaining that RxJS is great at discarding events, and how they came to the answer, but ultimately, the code you're looking for is:

queueRequest$
  .concatMap(queueData => Rx.Observable.of(queueData).delay(100))
  .subscribe(() => {
    sendRequest();
  });

concatMap adds concatenates the newly created observable to the back of the observable stream. Additionally, using delay pushes back the event by 100ms, allowing 10 request to happen per second. You can view the full JSBin here, which logs to the console instead of firing requests.

like image 33
Adam Avatar answered Nov 16 '22 03:11

Adam