Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I Interleave / merge async iterables?

Suppose I have some async iterable objects like this:

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  }, 
};

And for completeness:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

Now, suppose I can concat them like this:

const abcs = async function * () {
  yield * a;
  yield * b;
  yield * c;
};

The (first 9) items yielded will be:

(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of abcs()) {
    xs.push(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs);
})().catch(error => console.error(error));

// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]

But imagine that I do not care about the order, that a, b and c yield at different speeds, and that I want to yield as quickly as possible.

How can I rewrite this loop so that xs are yielded as soon as possible, ignoring order?


It is also possible that a, b or c are infinite sequences, so the solution must not require all elements to be buffered into an array.

like image 249
sdgfsdh Avatar asked May 29 '18 13:05

sdgfsdh


4 Answers

There is no way to write this with a loop statement. async/await code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there's Promise.all, for async iterators there is nothing (yet) so we need to write it on our own:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}

Notice that combine does not support passing values into next or cancellation through .throw or .return.

You can call it like

(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);
like image 138
Bergi Avatar answered Nov 14 '22 17:11

Bergi


If I change abcs to accept the generators to process, I come up with this, see inline comments:

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

Live Example:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
  }, 
};

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

(async () => {
  console.log("start");
  for await (const x of abcs(a, b, c)) {
    console.log(x);
  }
  console.log("done");
})().catch(error => console.error(error));
.as-console-wrapper {
  max-height: 100% !important;
}
like image 42
T.J. Crowder Avatar answered Nov 14 '22 18:11

T.J. Crowder


This is a complicated task, so I’m going to break it up into individual parts:

Step 1: logging each value from each async iterable to the console

Before we even think about creating an async iterator we should first consider the task of simply logging each value from each iterator to the console as they arrive. As with most concurrent tasks in javascript, this involves calling multiple async functions and awaiting their results with Promise.all.

function merge(iterables) {
  return Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        console.log(value);
      }
    }),
  );
}

// a, b and c are the async iterables defined in the question
merge([a, b, c]); // a, x, b, y, c, i, j, k, z, Error: you have gone too far!

CodeSandbox link: https://codesandbox.io/s/tender-ives-4hijy?fontsize=14

The merge function logs values from each iterator, but is mostly useless; it returns a promise which fulfills to an array of undefined when all iterators finish.

Step 2: Replacing the merge function with a merge async generator

The next step is to replace console.log calls with calls to a function which pushes to a parent async iterator. To do this with an async generator, we need a little bit more code, because the only way to “push” a value onto an async generator is with the yield operator, which can’t be used in child function scopes. The solution is to create two queues, a push queue and a pull queue. Next, we define a push function which either pushes to the push queue if there are no pending pulls, or enqueues a value to be pulled later. Finally, we have to perpetually yield either values from the push queue if it has values, or promises which enqueue a resolve function to be called by push later. Here’s the code:

async function *merge(iterables) {
  // pushQueue and pullQueue will never both contain values at the same time.
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }

  // the merge code from step 1
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      for await (const value of iter) {
        push(value);
      }
    }),
  );

  while (true) {
    if (pushQueue.length) {
      yield pushQueue.pop();
    } else {
      // important to note that yield in an async generator implicitly awaits promises.
      yield new Promise((resolve) => {
        pullQueue.unshift(resolve);
      });
    }
  }
}

// code from the question
(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of merge([a, b, c])) {
    xs.push(x);
    console.log(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs); // ["a", "x", "b", "y", "c", "i", "j", "k", "z"]
})().catch(error => console.error(error));

CodeSandbox link: https://codesandbox.io/s/misty-cookies-du1eg

This almost works! If you run the code, you’ll notice that the xs is correctly printed, but the break statement is not respected, and values continue to be pulled from child iterators, causing the error thrown in c to be thrown, resulting in an unhandled promise rejection. Also note that we don’t do anything with the result of the Promise.all call. Ideally, when the finishP promise settles, the generator should be returned. We need just a little bit more code to make sure that 1. the child iterators are returned when the parent iterator is returned (with a break statement in a for await loop, for instance), and 2. the parent iterator is returned when all child iterators return.

Step 3: stopping each child iterator when the parent iterator is returned, and the parent iterator when every child has returned.

To make sure each child async iterable is correctly returned when the parent async generator is returned, we can use a finally block to listen for the completion of the parent async generator. And to make sure the parent generator is returned when the child iterators return, we can race yielded promises against the finishP promise.

async function *merge(iterables) {
  const pushQueue = [];
  const pullQueue = [];
  function push(value) {
    if (pullQueue.length) {
      pullQueue.pop()(value);
    } else {
      pushQueue.unshift(value);
    }
  }
  // we create a promise to race calls to iter.next
  let stop;
  const stopP = new Promise((resolve) => (stop = resolve));
  let finished = false;
  const finishP = Promise.all(
    Array.from(iterables).map(async (iter) => {
      // we use the iterator interface rather than the iterable interface
      iter = iter[Symbol.asyncIterator]();
      try {
        while (true) {
          // because we can’t race promises with for await, we have to call iter.next manually
          const result = await Promise.race([stopP, iter.next()]);
          if (!result || result.done) {
            return;
          }
          push(result.value);
        }
      } finally {
        // we should be a good citizen and return child iterators
        await iter.return && iter.return();
      }
    }),
  ).finally(() => (finished = true));

  try {
    while (!finished) {
      if (pushQueue.length) {
        yield pushQueue.pop();
      } else {
        const value = await Promise.race([
          new Promise((resolve) => {
            pullQueue.unshift(resolve);
          }),
          finishP,
        ]);
        if (!finished) {
          yield value;
        }
      }
    }

    // we await finishP to make the iterator catch any promise rejections
    await finishP;
  } finally {
    stop();
  }
}

CodeSandbox link: https://codesandbox.io/s/vigilant-leavitt-h247u

There are some things we still need to do before this code is production ready. For instance, values are pulled from the child iterators continuously, without waiting for the parent iterator to pull them. This, combined with the fact that pushQueue is an unbounded array, can cause memory leaks if the parent iterator pulls values at a slower pace than the child iterators produces them.

Additionally, the merge iterator returns undefined as its final value, but you might want the final value to be the final value from the last-completing child iterator.

If you’re looking for a small, focused library which has a merge function like the one above which covers some more use-cases and edge-cases, check out Repeater.js, which I wrote. It defines the static method Repeater.merge, which does what I described above. It also provides a clean API for turning callback-based APIs into promises and other combinator static methods to combine async iterators in other ways.

like image 2
brainkim Avatar answered Nov 14 '22 16:11

brainkim


In case anyone finds it useful, here's a typescript version of the currently accepted answer:


const combineAsyncIterables = async function* <T>(
  asyncIterables: AsyncIterable<T>[],
): AsyncGenerator<T> {
  const asyncIterators = Array.from(asyncIterables, (o) =>
    o[Symbol.asyncIterator](),
  );
  const results = [];
  let count = asyncIterators.length;
  const never: Promise<never> = new Promise(noOp);
  const getNext = (asyncIterator: AsyncIterator<T>, index: number) =>
    asyncIterator.next().then((result) => ({ index, result }));

  const nextPromises = asyncIterators.map(getNext);
  try {
    while (count) {
      const { index, result } = await Promise.race(nextPromises);
      if (result.done) {
        nextPromises[index] = never;
        results[index] = result.value;
        count--;
      } else {
        nextPromises[index] = getNext(asyncIterators[index], index);
        yield result.value;
      }
    }
  } finally {
    for (const [index, iterator] of asyncIterators.entries()) {
      if (nextPromises[index] != never && iterator.return != null) {
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
        void iterator.return();
      }
    }
  }
  return results;
}; 
like image 1
Ben Elgar Avatar answered Nov 14 '22 17:11

Ben Elgar