Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ES6 Promise replacement of async.eachLimit / async.mapLimit

In async, if I need to apply a asynchronousfunction to 1000 items, I can do that with:

async.mapLimit(items, 10, (item, callback) => {
    foo(item, callback);
});

so that only 10 item are processed at the same time, limiting overhead and allowing control.

With ES6 promise, while I can easily do:

Promise.all(items.map((item) => {
    return bar(item);
}));

that would process all 1000 items at the same time which may cause a lot of problems.

I know Bluebird have ways to handle that, but I am searching a ES6 solution.

like image 966
DrakaSAN Avatar asked May 10 '17 12:05

DrakaSAN


People also ask

How to handle promise with async await?

async and await Inside an async function, you can use the await keyword before a call to a function that returns a promise. This makes the code wait at that point until the promise is settled, at which point the fulfilled value of the promise is treated as a return value, or the rejected value is thrown.

Is there any limit for promise all?

Assuming we have the processing power and that our promises can run in parallel, there is a hard limit of just over 2 million promises.

How to use async in function?

If you use the async keyword before a function definition, you can then use await within the function. When you await a promise, the function is paused in a non-blocking way until the promise settles. If the promise fulfills, you get the value back. If the promise rejects, the rejected value is thrown.

How to use async await in Nodejs?

Async functions are available natively in Node and are denoted by the async keyword in their declaration. They always return a promise, even if you don't explicitly write them to do so. Also, the await keyword is only available inside async functions at the moment – it cannot be used in the global scope.


2 Answers

If you don't care about the results, then it's quick to whip one up:

Promise.eachLimit = async (funcs, limit) => {
  let rest = funcs.slice(limit);
  await Promise.all(funcs.slice(0, limit).map(async func => {
    await func();
    while (rest.length) {
      await rest.shift()();
    }
  }));
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  await Promise.eachLimit(funcs, 5);
})();

A key performance property is running the next available function as soon as any function finishes.

Preserving results

Preserving the results in order makes it a little less elegant perhaps, but not too bad:

Promise.mapLimit = async (funcs, limit) => {
  let results = [];
  await Promise.all(funcs.slice(0, limit).map(async (func, i) => {
    results[i] = await func();
    while ((i = limit++) < funcs.length) {
      results[i] = await funcs[i]();
    }
  }));
  return results;
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
  return s.toLowerCase();
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  console.log((await Promise.mapLimit(funcs, 5)).join(""));
})();
like image 72
jib Avatar answered Sep 28 '22 02:09

jib


There's nothing built in, but you can of course group them yourself into promise chains, and use a Promise.all on the resulting array of chains:

const items = /* ...1000 items... */;
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    // What chain do we add it to?
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        // New chain
        chain = promises[chainNum] = Promise.resolve();
    }
    // Add it
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []));

Here's an example, showing how many concurrent promises there are any given time (and also showing when each "chain" is complete, and only doing 200 instead of 1,000):

const items = buildItems();
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        chain = promises[chainNum] = Promise.resolve();
    }
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []).map(chain => chain.then(_ => console.log("Chain done"))));
promise.then(_ => console.log("All done"));

function buildItems() {
  const items = [];
  for (let n = 0; n < 200; ++n) {
    items[n] = n;
  }
  return items;
}

var outstanding = 0;
function foo(item) {
  ++outstanding;
  console.log("Starting " + item + " (" + outstanding + ")");
  return new Promise(resolve => {
    setTimeout(_ => {
      --outstanding;
      console.log("Resolving " + item + " (" + outstanding + ")");
      resolve(item);
    }, Math.random() * 500);
  });
}
.as-console-wrapper {
  max-height: 100% !important;
}

I should note that if you want to track the result of each of those, you'd have to modify the above; it doesn't try to track the results (!). :-)

like image 36
T.J. Crowder Avatar answered Sep 28 '22 02:09

T.J. Crowder