Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use a event emitter as an async generator

I'm trying to use the neat syntax of async generator with babel (I'm stuck with node 8) and I'm wondering how you would convert an event emitter to an async generator cleanly

What I got so far look like this

    const { EventEmitter } = require('events')

    // defer fonction for resolving promises out of scope
    const Defer = () => {
      let resolve
      let reject
      let promise = new Promise((a, b) => {
        resolve = a
        reject = b
      })
      return {
        promise,
        reject,
        resolve
      }
    }


    // my iterator function
    function readEvents(emitter, channel) {
      const buffer = [Defer()]
      let subId = 0

      emitter.on(channel, x => {
        const promise = buffer[subId]
        subId++
        buffer.push(Defer())
        promise.resolve(x)
      })

      const gen = async function*() {
        while (true) {
          const val = await buffer[0].promise
          buffer.shift()
          subId--
          yield val
        }
      }

      return gen()
    }

    async function main () {
      const emitter = new EventEmitter()
      const iterator = readEvents(emitter, 'data')

      // this part generates events
      let i = 0
      setInterval(() => {
        emitter.emit('data', i++)
      }, 1000)

      // this part reads events
      for await (let val of iterator) {
        console.log(val)
      }
    }

    main()

This is unweildy - can it be simplified?

like image 540
Overcl9ck Avatar asked Jun 26 '18 14:06

Overcl9ck


People also ask

Are event emitters async?

emit("event1"); The neat thing about event emitters is that they are asynchronous by nature.

Is event emitter synchronous or asynchronous?

Yes, events are synchronous and blocking. They are implemented with simple function calls. If you look at the eventEmitter code, to send an event to all listeners, it literally just iterates through an array of listeners and calls each listener callback, one after the other.

How do you use event emitter?

Simply use it to emit events from your component. Take a look a the following example. @Component({ selector : 'child', template : ` <button (click)="sendNotification()">Notify my parent! </button> ` }) class Child { @Output() notifyParent: EventEmitter<any> = new EventEmitter(); sendNotification() { this.

What are async generators?

An async generator is similar to a regular generator with the following differences: The async keyword is placed in front of the function keyword. The yield returns a Promise , instead of a value. The Promise is typically a wrapper of an asynchronous operation.


3 Answers

I came up with this:

async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
    const sql = query.toSqlString();

    let results: TRecord[] = [];
    let resolve: () => void;
    let promise = new Promise(r => resolve = r);
    let done = false;

    this.pool.query(sql)
        .on('error', err => {
            throw err;
        })
        .on('result', row => {
            results.push(row);
            resolve();
            promise = new Promise(r => resolve = r);
        })
        .on('end', () => {
            done = true;
        })

    while(!done) {
        await promise;
        yield* results;
        results = [];
    }
}

Seems to be working so far.

i.e. you create a dummy promise like in Khanh's solution so that you can wait for the first result, but then because many results might come in all at once, you push them into an array and reset the promise to wait for the result (or batch of results). It doesn't matter if this promise gets overwritten dozens of times before its ever awaited.

Then we can yield all the results at once with yield* and flush the array for the next batch.

like image 81
mpen Avatar answered Oct 12 '22 17:10

mpen


Let's say we use redux-saga (as it uses generator at its core) and socket.io as an example of EventEmitter

import { call, put } from 'redux-saga/effects';

function* listen() {
  yield (function* () {
    let resolve;
    let promise = new Promise(r => resolve = r); // The defer

    socket.on('messages created', message => {
      console.log('Someone created a message', message);
      resolve(message); // Resolving the defer

      promise = new Promise(r => resolve = r); // Recreate the defer for the next cycle
    });

    while (true) {
      const message = yield promise; // Once the defer is resolved, message has some value
      yield put({ type: 'SOCKET_MESSAGE', payload: [message] });
    }
  })();
}

export default function* root() {
    yield call(listen);
}

The above setup should give you a generator that's blocked by the next event to be emitted by an event emitter (socket.io instance).

Cheers!

like image 43
Khanh Hua Avatar answered Oct 12 '22 17:10

Khanh Hua


Here is another take at this, handling timer tick events with for await loop, custom Symbol.asyncIterator and a simple queue for any potential event buffering. Works in both Node and browser environments (RunKit, Gist).

async function main() {
  const emitter = createEmitter();
  const start = Date.now();

  setInterval(() => emitter.emit(Date.now() - start), 1000);

  for await (const item of emitter) {
    console.log(`tick: ${item}`);
  }
}

main().catch(e => console.warn(`caught on main: ${e.message}`));

function createEmitter() {
  const queue = [];
  let resolve;

  const push = p => {
    queue.push(p);
    if (resolve) {
      resolve();
      resolve = null;
    }
  };

  const emitError = e => push(Promise.reject(e));

  return {
    emit: v => push(Promise.resolve(v)),
    throw: emitError,

    [Symbol.asyncIterator]: () => ({
      next: async () => {
        while(!queue.length) {
          await new Promise((...a) => [resolve] = a);
        }
        return { value: await queue.pop(), done: false };
      },
      throw: emitError
    })
  };
}
like image 41
noseratio Avatar answered Oct 12 '22 17:10

noseratio