Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to "multicast" an async iterable?

Can an async generator be somehow broadcast or multicast, so that all its iterators ("consumers"? subscribers?) receive all values?

Consider this example:

const fetchMock = () => "Example. Imagine real fetch";
async function* gen() {
  for (let i = 1; i <= 6; i++) {
    const res = await fetchMock();
    yield res.slice(0, 2) + i;
  }
}
const ait = gen();

(async() => {
  // first "consumer"
  for await (const e of ait) console.log('e', e);
})();
(async() => {
  // second...
  for await (const é of ait) console.log('é', é);
})();

Iterations "consume" a value, so only one or the other gets it. I would like for both of them (and any later ones) to get every yielded value, if such a generator is possible to create somehow. (Similar to an Observable.)

like image 645
ᅙᄉᅙ Avatar asked Aug 23 '20 04:08

ᅙᄉᅙ


1 Answers

This is not easily possible. You will need to explicitly tee it. This is similar to the situation for synchronous iterators, just a bit more complicated:

const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
function teeAsync(iterable) {
    const iterator = iterable[Symbol.asyncIterator]();
    const buffers = [[], []];
    function makeIterator(buffer, i) {
        return Object.assign(Object.create(AsyncIteratorProto), {
            next() {
                if (!buffer) return Promise.resolve({done: true, value: undefined});
                if (buffer.length) return buffer.shift();
                const res = iterator.next();
                if (buffers[i^1]) buffers[i^1].push(res);
                return res;
            },
            async return() {
                if (buffer) {
                    buffer = buffers[i] = null;
                    if (!buffers[i^1]) await iterator.return();
                }
                return {done: true, value: undefined};
            },
        });
    }
    return buffers.map(makeIterator);
}

You should ensure that both iterators are consumed at about the same rate so that the buffer doesn't grow too large.

like image 90
Bergi Avatar answered Nov 07 '22 01:11

Bergi