Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttle amount of promises open at a given time

The following TypeScript performs each call to doSomething(action) one at a time. (Meaning the second item in the list does not get a call made until the first one is done).

async performActionsOneAtATime() {
    for (let action of listOfActions) {
        const actionResult = await doSomethingOnServer(action);
        console.log(`Action Done: ${actionResult}`);
    }
 }

This one will send all the requests to the server right away (without waiting for any responses):

async performActionsInParallel() {
    for (let action of listOfActions) {
        const actionResultPromise = doSomething(action);
        actionResultPromise.then((actionResult) => {
            console.log(`Action Done: ${actionResult}`);
        });
    }
}

But what I really need is a way to throttle them. Maybe have 10 or 20 calls open at a time. (One at at a time is too slow, but all 600 will overload the server.)

But I am having a hard time figuring this out.

Any suggestions on how I can throttle the number of calls to X open at a time?

(This question uses TypeScript, but I would be fine with an ES6 JavaScript answer.)

like image 416
Vaccano Avatar asked Jul 14 '16 22:07

Vaccano


People also ask

What is the difference between Debounce and throttle?

Debouncing is a technique where we can monitor the time delay of user action and once that delay reaches our predetermined threshold we can can make the function call. Throttling is a technique where we make the function call in a predetermined time interval irrespective of continuous user actions.

How does Promises work under the hood?

You are passing a callback that defines the specific behavior of your promise. A Promise is a container that gives us an API to manage and transform a value, and its specificity is that it lets us manage and transform values that are actually not already there yet.

How many Promises can node handle?

For the limiting modules, they are all set to allow 10 concurrent promises. All tests were performed using Node v12. 14.0.

How does promise work in event loop?

During the process of promises microtask queue, one item is again added to the process. nextTick queue ('next tick inside promise resolve handler'). After promises microtask queue is finished, event loop will again detect that there is one item is in the process.


2 Answers

You can do this in one short function. (Returns values in order per naomik's suggestion. Thanks!)

/**
 * Performs a list of callable actions (promise factories) so
 * that only a limited number of promises are pending at any
 * given time.
 *
 * @param listOfCallableActions An array of callable functions,
 *     which should return promises.
 * @param limit The maximum number of promises to have pending
 *     at once.
 * @returns A Promise that resolves to the full list of values
 *     when everything is done.
 */
function throttleActions(listOfCallableActions, limit) {
  // We'll need to store which is the next promise in the list.
  let i = 0;
  let resultArray = new Array(listOfCallableActions.length);

  // Now define what happens when any of the actions completes.
  // Javascript is (mostly) single-threaded, so only one
  // completion handler will call at a given time. Because we
  // return doNextAction, the Promise chain continues as long as
  // there's an action left in the list.
  function doNextAction() {
    if (i < listOfCallableActions.length) {
      // Save the current value of i, so we can put the result
      // in the right place
      let actionIndex = i++;
      let nextAction = listOfCallableActions[actionIndex];
      return Promise.resolve(nextAction()).then(result => {
        // Save results to the correct array index.
        resultArray[actionIndex] = result;
      }).then(doNextAction);
    }
  }

  // Now start up the original <limit> number of promises.
  // i advances in calls to doNextAction.
  let listOfPromises = [];
  while (i < limit && i < listOfCallableActions.length) {
    listOfPromises.push(doNextAction());
  }
  return Promise.all(listOfPromises).then(() => resultArray);
}

// Test harness:

function delay(name, ms) {
  return new Promise((resolve, reject) => setTimeout(() => {
    console.log(name);
    resolve(name);
  }, ms));
}

var ps = [];
for (let i = 0; i < 10; i++) {
  ps.push(() => delay("promise " + i, Math.random() * 3000));
}

throttleActions(ps, 3).then(result => console.log(result));
like image 183
Jeff Bowman Avatar answered Oct 03 '22 20:10

Jeff Bowman


EDIT

Jeff Bowman has vastly improved his answer to resolve meaningful values. Feel free to view the history of this answer to understand why the resolved values are so important/useful.


throttlep

This solution closely mimics the native Promise.all

How it's the same …

  • Resolves promises as quickly as possible
  • Resolves an array of values in the same order as the inputs
  • Rejects as soon as it encounters one reject

How it's different …

  • Number parameter limits the number of simultaneously-running Promises
  • Array input accepts promise creators (thunks); not actual Promises

// throttlep :: Number -> [(* -> Promise)]
const throttlep = n=> Ps=>
  new Promise ((pass, fail)=> {
    // r is the number of promises, xs is final resolved value
    let r = Ps.length, xs = []
    // decrement r, save the resolved value in position i, run the next promise
    let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
    // if r is 0, we can resolve the final value xs, otherwise chain next
    let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
    // initialize by running the first n promises
    Ps.slice(0,n).forEach(run)
  })

// -----------------------------------------------------
// make sure it works

// delay :: (String, Number) -> (* -> Promise)
const delay = (id, ms)=>
  new Promise (pass=> {
    console.log (`running: ${id}`)
    setTimeout(pass, ms, id)
  })

// ps :: [(* -> Promise)]
let ps = new Array(10)
for (let i = 0; i < 10; i++) {
  ps[i] = () => delay(i, Math.random() * 3000)
}

// run a limit of 3 promises in parallel
// the first error will reject the entire pool
throttlep (3) (ps) .then (
  xs => console.log ('result:', xs),
  err=> console.log ('error:', err.message)
)

Console output

Inputs are run in order; Resolved results are in the same order as the inputs

running: 0
running: 1
running: 2
=> Promise {}
running: 3
running: 4
running: 5
running: 6
running: 7
running: 8
running: 9
result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Practical use

Let's look at a more practical code example. This code is tasked with fetching a set of images from a server. This is how we might use throttlep to throttle the amount of simultaneous requests to 3 at a time

// getImage :: String -> Promise<base64>
let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)

// actions :: [(* -> Promise<base64>)]
let actions = [
  ()=> getImage('one.jpg'),
  ()=> getImage('two.jpg'),
  ()=> getImage('three.jpg'),
  ()=> getImage('four.jpg'),
  ()=> getImage('five.jpg')
]

// throttle the actions then do something...
throttlep (3) (actions) .then(results => {
  // results are guaranteed to be ordered the same as the input array
  console.log(results)
  // [<base64>, <base64>, <base64>, <base64>, <base64>]
})
like image 22
Mulan Avatar answered Oct 03 '22 21:10

Mulan