Suppose I have some async iterable objects like this:
const a = {
[Symbol.asyncIterator]: async function * () {
yield 'a';
await sleep(1000);
yield 'b';
await sleep(2000);
yield 'c';
},
};
const b = {
[Symbol.asyncIterator]: async function * () {
await sleep(6000);
yield 'i';
yield 'j';
await sleep(2000);
yield 'k';
},
};
const c = {
[Symbol.asyncIterator]: async function * () {
yield 'x';
await sleep(2000);
yield 'y';
await sleep(8000);
yield 'z';
await sleep(10000);
throw new Error('You have gone too far! ');
},
};
And for completeness:
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
setTimeout(() => resolve(ms), ms);
});
Now, suppose I can concat them like this:
const abcs = async function * () {
yield * a;
yield * b;
yield * c;
};
The (first 9) items yielded will be:
(async () => {
const limit = 9;
let i = 0;
const xs = [];
for await (const x of abcs()) {
xs.push(x);
i++;
if (i === limit) {
break;
}
}
console.log(xs);
})().catch(error => console.error(error));
// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]
But imagine that I do not care about the order, that a
, b
and c
yield at different speeds, and that I want to yield as quickly as possible.
How can I rewrite this loop so that x
s are yielded as soon as possible, ignoring order?
It is also possible that a
, b
or c
are infinite sequences, so the solution must not require all elements to be buffered into an array.
There is no way to write this with a loop statement. async
/await
code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there's Promise.all
, for async iterators there is nothing (yet) so we need to write it on our own:
async function* combine(iterable) {
const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
const results = [];
let count = asyncIterators.length;
const never = new Promise(() => {});
function getNext(asyncIterator, index) {
return asyncIterator.next().then(result => ({
index,
result,
}));
}
const nextPromises = asyncIterators.map(getNext);
try {
while (count) {
const {index, result} = await Promise.race(nextPromises);
if (result.done) {
nextPromises[index] = never;
results[index] = result.value;
count--;
} else {
nextPromises[index] = getNext(asyncIterators[index], index);
yield result.value;
}
}
} finally {
for (const [index, iterator] of asyncIterators.entries())
if (nextPromises[index] != never && iterator.return != null)
iterator.return();
// no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
}
return results;
}
Notice that combine
does not support passing values into next
or cancellation through .throw
or .return
.
You can call it like
(async () => {
for await (const x of combine([a, b, c])) {
console.log(x);
}
})().catch(console.error);
If I change abcs
to accept the generators to process, I come up with this, see inline comments:
const abcs = async function * (...gens) {
// Worker function to queue up the next result
const queueNext = async (e) => {
e.result = null; // Release previous one as soon as possible
e.result = await e.it.next();
return e;
};
// Map the generators to source objects in a map, get and start their
// first iteration
const sources = new Map(gens.map(gen => [
gen,
queueNext({
key: gen,
it: gen[Symbol.asyncIterator]()
})
]));
// While we still have any sources, race the current promise of
// the sources we have left
while (sources.size) {
const winner = await Promise.race(sources.values());
// Completed the sequence?
if (winner.result.done) {
// Yes, drop it from sources
sources.delete(winner.key);
} else {
// No, grab the value to yield and queue up the next
// Then yield the value
const {value} = winner.result;
sources.set(winner.key, queueNext(winner));
yield value;
}
}
};
Live Example:
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
setTimeout(() => resolve(ms), ms);
});
const a = {
[Symbol.asyncIterator]: async function * () {
yield 'a';
await sleep(1000);
yield 'b';
await sleep(2000);
yield 'c';
},
};
const b = {
[Symbol.asyncIterator]: async function * () {
await sleep(6000);
yield 'i';
yield 'j';
await sleep(2000);
yield 'k';
},
};
const c = {
[Symbol.asyncIterator]: async function * () {
yield 'x';
await sleep(2000);
yield 'y';
await sleep(8000);
yield 'z';
},
};
const abcs = async function * (...gens) {
// Worker function to queue up the next result
const queueNext = async (e) => {
e.result = null; // Release previous one as soon as possible
e.result = await e.it.next();
return e;
};
// Map the generators to source objects in a map, get and start their
// first iteration
const sources = new Map(gens.map(gen => [
gen,
queueNext({
key: gen,
it: gen[Symbol.asyncIterator]()
})
]));
// While we still have any sources, race the current promise of
// the sources we have left
while (sources.size) {
const winner = await Promise.race(sources.values());
// Completed the sequence?
if (winner.result.done) {
// Yes, drop it from sources
sources.delete(winner.key);
} else {
// No, grab the value to yield and queue up the next
// Then yield the value
const {value} = winner.result;
sources.set(winner.key, queueNext(winner));
yield value;
}
}
};
(async () => {
console.log("start");
for await (const x of abcs(a, b, c)) {
console.log(x);
}
console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
max-height: 100% !important;
}
This is a complicated task, so I’m going to break it up into individual parts:
Before we even think about creating an async iterator we should first consider the task of simply logging each value from each iterator to the console as they arrive. As with most concurrent tasks in javascript, this involves calling multiple async functions and awaiting their results with Promise.all
.
function merge(iterables) {
return Promise.all(
Array.from(iterables).map(async (iter) => {
for await (const value of iter) {
console.log(value);
}
}),
);
}
// a, b and c are the async iterables defined in the question
merge([a, b, c]); // a, x, b, y, c, i, j, k, z, Error: you have gone too far!
CodeSandbox link: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14
The merge
function logs values from each iterator, but is mostly useless; it returns a promise which fulfills to an array of undefined
when all iterators finish.
The next step is to replace console.log
calls with calls to a function which pushes to a parent async iterator. To do this with an async generator, we need a little bit more code, because the only way to “push” a value onto an async generator is with the yield
operator, which can’t be used in child function scopes. The solution is to create two queues, a push queue and a pull queue. Next, we define a push
function which either pushes to the push queue if there are no pending pulls, or enqueues a value to be pulled later. Finally, we have to perpetually yield either values from the push queue if it has values, or promises which enqueue a resolve function to be called by push later. Here’s the code:
async function *merge(iterables) {
// pushQueue and pullQueue will never both contain values at the same time.
const pushQueue = [];
const pullQueue = [];
function push(value) {
if (pullQueue.length) {
pullQueue.pop()(value);
} else {
pushQueue.unshift(value);
}
}
// the merge code from step 1
const finishP = Promise.all(
Array.from(iterables).map(async (iter) => {
for await (const value of iter) {
push(value);
}
}),
);
while (true) {
if (pushQueue.length) {
yield pushQueue.pop();
} else {
// important to note that yield in an async generator implicitly awaits promises.
yield new Promise((resolve) => {
pullQueue.unshift(resolve);
});
}
}
}
// code from the question
(async () => {
const limit = 9;
let i = 0;
const xs = [];
for await (const x of merge([a, b, c])) {
xs.push(x);
console.log(x);
i++;
if (i === limit) {
break;
}
}
console.log(xs); // ["a", "x", "b", "y", "c", "i", "j", "k", "z"]
})().catch(error => console.error(error));
CodeSandbox link: https://codesandbox.io/s/misty-cookies-du1eg
This almost works! If you run the code, you’ll notice that the xs
is correctly printed, but the break
statement is not respected, and values continue to be pulled from child iterators, causing the error thrown in c
to be thrown, resulting in an unhandled promise rejection. Also note that we don’t do anything with the result of the Promise.all
call. Ideally, when the finishP
promise settles, the generator should be returned. We need just a little bit more code to make sure that 1. the child iterators are returned when the parent iterator is returned (with a break
statement in a for await
loop, for instance), and 2. the parent iterator is returned when all child iterators return.
To make sure each child async iterable is correctly returned when the parent async generator is returned, we can use a finally block to listen for the completion of the parent async generator. And to make sure the parent generator is returned when the child iterators return, we can race yielded promises against the finishP
promise.
async function *merge(iterables) {
const pushQueue = [];
const pullQueue = [];
function push(value) {
if (pullQueue.length) {
pullQueue.pop()(value);
} else {
pushQueue.unshift(value);
}
}
// we create a promise to race calls to iter.next
let stop;
const stopP = new Promise((resolve) => (stop = resolve));
let finished = false;
const finishP = Promise.all(
Array.from(iterables).map(async (iter) => {
// we use the iterator interface rather than the iterable interface
iter = iter[Symbol.asyncIterator]();
try {
while (true) {
// because we can’t race promises with for await, we have to call iter.next manually
const result = await Promise.race([stopP, iter.next()]);
if (!result || result.done) {
return;
}
push(result.value);
}
} finally {
// we should be a good citizen and return child iterators
await iter.return && iter.return();
}
}),
).finally(() => (finished = true));
try {
while (!finished) {
if (pushQueue.length) {
yield pushQueue.pop();
} else {
const value = await Promise.race([
new Promise((resolve) => {
pullQueue.unshift(resolve);
}),
finishP,
]);
if (!finished) {
yield value;
}
}
}
// we await finishP to make the iterator catch any promise rejections
await finishP;
} finally {
stop();
}
}
CodeSandbox link: https://codesandbox.io/s/vigilant-leavitt-h247u
There are some things we still need to do before this code is production ready. For instance, values are pulled from the child iterators continuously, without waiting for the parent iterator to pull them. This, combined with the fact that pushQueue
is an unbounded array, can cause memory leaks if the parent iterator pulls values at a slower pace than the child iterators produces them.
Additionally, the merge iterator returns undefined
as its final value, but you might want the final value to be the final value from the last-completing child iterator.
If you’re looking for a small, focused library which has a merge function like the one above which covers some more use-cases and edge-cases, check out Repeater.js, which I wrote. It defines the static method Repeater.merge
, which does what I described above. It also provides a clean API for turning callback-based APIs into promises and other combinator static methods to combine async iterators in other ways.
In case anyone finds it useful, here's a typescript version of the currently accepted answer:
const combineAsyncIterables = async function* <T>(
asyncIterables: AsyncIterable<T>[],
): AsyncGenerator<T> {
const asyncIterators = Array.from(asyncIterables, (o) =>
o[Symbol.asyncIterator](),
);
const results = [];
let count = asyncIterators.length;
const never: Promise<never> = new Promise(noOp);
const getNext = (asyncIterator: AsyncIterator<T>, index: number) =>
asyncIterator.next().then((result) => ({ index, result }));
const nextPromises = asyncIterators.map(getNext);
try {
while (count) {
const { index, result } = await Promise.race(nextPromises);
if (result.done) {
nextPromises[index] = never;
results[index] = result.value;
count--;
} else {
nextPromises[index] = getNext(asyncIterators[index], index);
yield result.value;
}
}
} finally {
for (const [index, iterator] of asyncIterators.entries()) {
if (nextPromises[index] != never && iterator.return != null) {
// no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
void iterator.return();
}
}
}
return results;
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With