Say I want to fetch 10 urls concurrently,
and process the responses as they are recieved
(which may be in a different order from the order
in which they appear in the original list).
Ignoring the possibility of rejections, one way to do this is simply to attach a "then" callback
to each promise, and then wait for them all to finish
using Promise.all().
const fetch_promises = [
fetch("https://cors-demo.glitch.me/allow-cors"),
fetch("/"),
fetch("."),
fetch(""),
fetch("https://enable-cors.org"),
fetch("https://html5rocks-cors.s3-website-us-east-1.amazonaws.com/index.html"),
fetch("https://api.github.com"),
fetch("https://api.flickr.com/services/rest/"),
];
const processing_promises = [];
for (const fetch_promise of fetch_promises) {
processing_promises.push(fetch_promise.then(response => {
// Process response. In this example, that means just
// print it.
console.log("got a response: ",response);
}));
}
await Promise.all(processing_promises);
Switching to an example with clearer and more deterministic output:
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
const processing_promises = [];
for (const sleep_promise of sleep_promises) {
processing_promises.push(sleep_promise.then(result => {
console.log("promise resolved: ",result);
}));
}
await Promise.all(processing_promises);
The output is as expected:
15:54:16.331 promise resolved: slept 1000
15:54:17.331 promise resolved: slept 2000
15:54:18.331 promise resolved: slept 3000
15:54:19.332 promise resolved: slept 4000
15:54:20.331 promise resolved: slept 5000
My question is this: suppose I want to, or need to, express the processing described above as an "async for..of" loop, instead of "then" callbacks; so the promises results need to come out in the form of an async iterable. How would I convert the array of promises to such an async iterable? What I'm asking for is an async generator function AwaitAsTheyCome(), taking as input a list of promises, which yields the results one by one as the promises resolve. I'd then call the function, and do the processing, as follows:
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
It should give the same output (with the same timing) as above.
The following attempted solution obviously doesn't work, but it may give an idea of about how simple and short I expect this to be:
async function* AwaitAsTheyCome(promises) {
for (const promise of promises) {
promise.then(response => {
yield response; // WRONG
// I want to yield it from AwaitAsTheyCome,
// not from the current arrow function!
});
}
}
The following solution does work, but it's more code than I expected to have to write for this.
async function* AwaitAsTheyCome(promises) {
// Make a list of notifier promises and
// functions that resolve those promises,
// one for each of the original promises.
const notifier_promises = [];
const notifier_resolves = [];
for (const promise of promises) {
notifier_promises.push(
new Promise(resolve=>notifier_resolves.push(resolve)));
}
const responses = [];
for (const promise of promises) {
promise.then(response => {
responses.push(response);
// send one notification (i.e. resolve the next notifier promise)
notifier_resolves.shift()();
});
}
for (const promise of promises) {
// wait for one notification
// (i.e. wait for the next notifier promise to be resolved).
await notifier_promises.shift();
// yield the corresponding response
yield responses.shift();
}
}
// Example/test usage
const sleep = millis => new Promise(resolve=>setTimeout(resolve, millis));
const sleep_promises = [
sleep(3000).then(()=>"slept 3000"),
sleep(1000).then(()=>"slept 1000"),
sleep(5000).then(()=>"slept 5000"),
sleep(4000).then(()=>"slept 4000"),
sleep(2000).then(()=>"slept 2000"),
];
for await (const result of AwaitAsTheyCome(sleep_promises)) {
console.log("promise resolved: ",result);
}
Is there a simpler way to implement the async generator function AwaitAsTheyCome?
(I tried making a stacksnippet out of the above code, but it didn't work-- I suspect this is because the snippets system doesn't understand the new async generator and/or for await..of syntax)
You can simplify the code a bit by
responses array but simply fulfilling the promises.shift() on the promises array but simply looping itasync function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve=> {
resolvers.push(resolve);
}));
p.then(result => {
resolvers.shift()(result);
});
}
for (const promise of promises) {
yield promise;
}
}
If you don't like the amount of code required, I would recommend to factor out the queue this implements in a separate module. With e.g. this implementation, the code can become as simple as
function raceAll(promises) {
const queue = new AsyncBlockingQueue();
for (const p of promises) {
p.then(result => {
queue.enqueue(result);
});
}
return queue[Symbol.asyncIterator]();
}
However, both of these implementation miss a crucial issue: error handling. If any of these promises rejects, you'll get an unhandled rejection error which may crash your process. To actually get the async iterator to reject the next promise, so that a try/catch around a for await…of loop may handle it, you'd need to do something like
async function* raceAll(input) {
const promises = [];
const resolvers = [];
for (const p of input) {
promises.push(new Promise(resolve => {
resolvers.push(resolve);
}));
p.finally(() => {
resolvers.shift()(p);
});
// works equivalent to:
// p.then(result => {
// resolvers.shift()(result);
// }, error => {
// resolvers.shift()(Promise.reject(error));
// });
}
for (const promise of promises) {
yield promise;
}
}
Resolving the promise with a rejected promise does the trick so that we still only need one queue of resolver functions, not one containing both resolve and reject functions.
You could work with Promise.race to get a promise for the next resolving one, then remove that promise from the list and repeat.
async function* raceAll(promises) {
// Don't mutate original array, and have Promise.race work with the
// chained promises, so that if there is a rejection, the caller's
// error handler will stop a rejection to bubble up unhandled.
promises = promises.map(p => p = p.then(val => {
promises.splice(promises.indexOf(p), 1);
return val;
}));
while (promises.length) yield Promise.race(promises);
}
// Demo
const delay = (ms, val, err) => new Promise((resolve, reject) =>
setTimeout(() => err ? reject(err) : resolve(val), ms)
);
(async function main() {
const promises = [
delay(200, 200),
delay(500, 500), // Add third argument to trigger rejection
delay(100, 100),
delay(400, 400)
];
try {
for await (const val of raceAll(promises)) {
console.log(val);
}
} catch(e) {
console.log("caught", e);
}
})();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With