How can I use a event emitter as an async generator

I'm trying to use the neat syntax of async generator with babel (I'm stuck with node 8) and I'm wondering how you would convert an event emitter to an async generator cleanly

What I got so far look like this

    const { EventEmitter } = require('events')

    // defer fonction for resolving promises out of scope
    const Defer = () => {
      let resolve
      let reject
      let promise = new Promise((a, b) => {
        resolve = a
        reject = b
      return {

    // my iterator function
    function readEvents(emitter, channel) {
      const buffer = [Defer()]
      let subId = 0

      emitter.on(channel, x => {
        const promise = buffer[subId]

      const gen = async function*() {
        while (true) {
          const val = await buffer[0].promise
          yield val

      return gen()

    async function main () {
      const emitter = new EventEmitter()
      const iterator = readEvents(emitter, 'data')

      // this part generates events
      let i = 0
      setInterval(() => {
        emitter.emit('data', i++)
      }, 1000)

      // this part reads events
      for await (let val of iterator) {


This is unweildy - can it be simplified?

3 Answers

I came up with this:

async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
    const sql = query.toSqlString();

    let results: TRecord[] = [];
    let resolve: () => void;
    let promise = new Promise(r => resolve = r);
    let done = false;

        .on('error', err => {
            throw err;
        .on('result', row => {
            promise = new Promise(r => resolve = r);
        .on('end', () => {
            done = true;

    while(!done) {
        await promise;
        yield* results;
        results = [];

Seems to be working so far.

i.e. you create a dummy promise like in Khanh's solution so that you can wait for the first result, but then because many results might come in all at once, you push them into an array and reset the promise to wait for the result (or batch of results). It doesn't matter if this promise gets overwritten dozens of times before its ever awaited.

Then we can yield all the results at once with yield* and flush the array for the next batch.

Let's say we use redux-saga (as it uses generator at its core) and socket.io as an example of EventEmitter

import { call, put } from 'redux-saga/effects';

function* listen() {
  yield (function* () {
    let resolve;
    let promise = new Promise(r => resolve = r); // The defer

    socket.on('messages created', message => {
      console.log('Someone created a message', message);
      resolve(message); // Resolving the defer

      promise = new Promise(r => resolve = r); // Recreate the defer for the next cycle

    while (true) {
      const message = yield promise; // Once the defer is resolved, message has some value
      yield put({ type: 'SOCKET_MESSAGE', payload: [message] });

export default function* root() {
    yield call(listen);

The above setup should give you a generator that's blocked by the next event to be emitted by an event emitter (socket.io instance).


Here is another take at this, handling timer tick events with for await loop, custom Symbol.asyncIterator and a simple queue for any potential event buffering. Works in both Node and browser environments (RunKit, Gist).

async function main() {
  const emitter = createEmitter();
  const start = Date.now();

  setInterval(() => emitter.emit(Date.now() - start), 1000);

  for await (const item of emitter) {
    console.log(`tick: ${item}`);

main().catch(e => console.warn(`caught on main: ${e.message}`));

function createEmitter() {
  const queue = [];
  let resolve;

  const push = p => {
    if (resolve) {
      resolve = null;

  const emitError = e => push(Promise.reject(e));

  return {
    emit: v => push(Promise.resolve(v)),
    throw: emitError,

    [Symbol.asyncIterator]: () => ({
      next: async () => {
        while(!queue.length) {
          await new Promise((...a) => [resolve] = a);
        return { value: await queue.pop(), done: false };
      throw: emitError
