Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs periodic polling of an endpoint with a variable response time

I want to poll an endpoint no faster than once a second, and no slower than the time it takes to poll the endpoint. There should never be more than one request outstanding.

I want a reactive programming way to poll an endpoint at least once a second, but if the endpoint takes longer than 1 second, the next request fires immediately.

In the marble diagram below, the 2nd and 3rd requests take longer than 1 second, but the 4th and 5th requests finish quicker. The next request fires either on the 1 second boundary, or immediately upon obtaining the data from the last outstanding request.

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events

I'm trying to get the terminology correct in the marble diagram, so I'm assuming that the beginning of the endpoint requests should be the marble I label "r", and the marble event I label "d" has the endpoint data.

Here's how much code it took me to do this in plain js; however, the subsequent requests do not fire immediately upon being obtained as I have asked above.

var poll;
var previousData;
var isPolling = false;
var dashboardUrl = 'gui/metrics/dashboard';
var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

return {
    startInterval: startInterval,
    stopInterval: stopInterval
};

function startInterval() {
    stopInterval();
    tryPolling(); // immediately hit the dashboard
    // attempt polling at the interval
    poll = $interval(tryPolling, intervalMs);
}

/**
 * attempt polling as long as there is no in-flight request
 * once the in-flight request completes or fails, allow the next request to be processed
 */
function tryPolling() {
    if (!isPolling) {
        isPolling = true;

        getDashboard()
        // if the dashboard either returns successful or fails, reset the polling boolean
            .then(resetPolling, resetPolling);
    }
}

/** there's no longer an in-flight request, so reset the polling boolean */
function resetPolling() {
    isPolling = false;
}

function stopInterval() {
    if (poll) {
        $interval.cancel(poll);
        poll = undefined;
    }
}

function getDashboard() {
    return restfulService.get(dashboardUrl)
        .then(updateDashboard);
}

function updateDashboard(data) {
    if (!utils.deepEqual(data, previousData)) {
        previousData = angular.copy(data);
        $rootScope.$broadcast('$dashboardLoaded', data);
    }
}
like image 612
activedecay Avatar asked Jan 11 '18 17:01

activedecay


3 Answers

Here is my solution. It uses an internal subject, combineLatest and filter to ensure that requests don't accumulate if the responses are slower to arrive than the timer period.

The comments should explain how it works.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
    return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

function poll() {

  return Rx.Observable.defer(() => {

    // Use defer so that the internal subject is created for each
    // subscription.
    const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });

    return Rx.Observable
    
      // Combine the timer and the subject's state.
      .combineLatest(
        Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
        subject
      )

      // Filter out combinations in which either a more recent tick
      // has not occurred or a request is pending.
      .filter(([tick, state]) => (tick !== state.tick) && !state.pending)

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: true }))
      
      // Make the request and use the result selector to combine
      // the tick and the response.
      .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: false }))
      
      // Map the response.
      .map(([tick, resp]) => resp);
  });
}

poll().take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

It's just occurred to me that there is an operator that does exactly this: exhaustMap.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
  return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

const poll = Rx.Observable
  .timer(0, 1000)
  .do(tick => console.log("tick", tick))
  .exhaustMap(() => mock());

poll.take(delays.length).subscribe(r => console.log(r));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
like image 191
cartant Avatar answered Sep 21 '22 22:09

cartant


I believe this does what you want:

let counter = 0;
function apiCall() {
  const delay = Math.random() * 1000;
  const count = ++counter;
  return Rx.Observable.timer(delay).mapTo(count);
}

Rx.Observable.timer(0, 1000)
  .mergeMap(() => apiCall())
  .take(1)
  .repeat()
  .subscribe(x => { console.log(x); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
  • timer(0, 1000): emits immediately and on a one second interval after that
  • mergeMap(...): switches to the observable returned by the api call. This will generate a new observable on each retry. If you don't want to create a new one on each retry then replace this with mergeMapTo(apiCall()).
  • take(1): forces the subscription to complete so the timer doesn't fire once the api has emitted
  • repeat(): start the sequence over when the api emits

So the call will be made immediately to the api. If it doesn't return within one second then another call will be made each second. Once there is a response from one of the api calls the timer will be canceled and the whole sequence started over. This will not cancel in-flight requests which I believe is inline with your intent.

EDIT: If a later request returns before a previous request then the previous request will be thrown out.

like image 35
bygrace Avatar answered Sep 21 '22 22:09

bygrace


I did have to think about that for 15 mn before I came up with an answer based only on rxjs and without side effects (no variable assignation) AND without back pressure!

const { Observable } = Rx;

const mockHttpRequest = url =>
  Observable
    .of('ok')
    .do(x => console.log('fetching...'))
    .delay(250);

const poll = (httpRequest$, ms) => {
  const tick$ = Observable.timer(ms);

  return Observable
    .zip(httpRequest$, tick$)
    .repeat()
    .map(([httpResult]) => httpResult);
};

poll(mockHttpRequest('your-url-here'), 1000)
  .do(console.log)
  .subscribe();

Here's a working Plunkr: https://plnkr.co/edit/sZTjLedNCE64bgLNhnaS?p=preview

like image 39
maxime1992 Avatar answered Sep 22 '22 22:09

maxime1992