I'm trying to use the neat syntax of async generator with babel (I'm stuck with node 8) and I'm wondering how you would convert an event emitter to an async generator cleanly
What I got so far look like this
const { EventEmitter } = require('events')
// defer fonction for resolving promises out of scope
const Defer = () => {
let resolve
let reject
let promise = new Promise((a, b) => {
resolve = a
reject = b
})
return {
promise,
reject,
resolve
}
}
// my iterator function
function readEvents(emitter, channel) {
const buffer = [Defer()]
let subId = 0
emitter.on(channel, x => {
const promise = buffer[subId]
subId++
buffer.push(Defer())
promise.resolve(x)
})
const gen = async function*() {
while (true) {
const val = await buffer[0].promise
buffer.shift()
subId--
yield val
}
}
return gen()
}
async function main () {
const emitter = new EventEmitter()
const iterator = readEvents(emitter, 'data')
// this part generates events
let i = 0
setInterval(() => {
emitter.emit('data', i++)
}, 1000)
// this part reads events
for await (let val of iterator) {
console.log(val)
}
}
main()
This is unweildy - can it be simplified?
emit("event1"); The neat thing about event emitters is that they are asynchronous by nature.
Yes, events are synchronous and blocking. They are implemented with simple function calls. If you look at the eventEmitter code, to send an event to all listeners, it literally just iterates through an array of listeners and calls each listener callback, one after the other.
Simply use it to emit events from your component. Take a look a the following example. @Component({ selector : 'child', template : ` <button (click)="sendNotification()">Notify my parent! </button> ` }) class Child { @Output() notifyParent: EventEmitter<any> = new EventEmitter(); sendNotification() { this.
An async generator is similar to a regular generator with the following differences: The async keyword is placed in front of the function keyword. The yield returns a Promise , instead of a value. The Promise is typically a wrapper of an asynchronous operation.
I came up with this:
async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
const sql = query.toSqlString();
let results: TRecord[] = [];
let resolve: () => void;
let promise = new Promise(r => resolve = r);
let done = false;
this.pool.query(sql)
.on('error', err => {
throw err;
})
.on('result', row => {
results.push(row);
resolve();
promise = new Promise(r => resolve = r);
})
.on('end', () => {
done = true;
})
while(!done) {
await promise;
yield* results;
results = [];
}
}
Seems to be working so far.
i.e. you create a dummy promise like in Khanh's solution so that you can wait for the first result, but then because many results might come in all at once, you push them into an array and reset the promise to wait for the result (or batch of results). It doesn't matter if this promise gets overwritten dozens of times before its ever awaited.
Then we can yield all the results at once with yield*
and flush the array for the next batch.
Let's say we use redux-saga
(as it uses generator at its core) and socket.io as an example of EventEmitter
import { call, put } from 'redux-saga/effects';
function* listen() {
yield (function* () {
let resolve;
let promise = new Promise(r => resolve = r); // The defer
socket.on('messages created', message => {
console.log('Someone created a message', message);
resolve(message); // Resolving the defer
promise = new Promise(r => resolve = r); // Recreate the defer for the next cycle
});
while (true) {
const message = yield promise; // Once the defer is resolved, message has some value
yield put({ type: 'SOCKET_MESSAGE', payload: [message] });
}
})();
}
export default function* root() {
yield call(listen);
}
The above setup should give you a generator that's blocked by the next event to be emitted by an event emitter (socket.io instance).
Cheers!
Here is another take at this, handling timer tick events with for await
loop, custom Symbol.asyncIterator
and a simple queue for any potential event buffering. Works in both Node and browser environments (RunKit, Gist).
async function main() {
const emitter = createEmitter();
const start = Date.now();
setInterval(() => emitter.emit(Date.now() - start), 1000);
for await (const item of emitter) {
console.log(`tick: ${item}`);
}
}
main().catch(e => console.warn(`caught on main: ${e.message}`));
function createEmitter() {
const queue = [];
let resolve;
const push = p => {
queue.push(p);
if (resolve) {
resolve();
resolve = null;
}
};
const emitError = e => push(Promise.reject(e));
return {
emit: v => push(Promise.resolve(v)),
throw: emitError,
[Symbol.asyncIterator]: () => ({
next: async () => {
while(!queue.length) {
await new Promise((...a) => [resolve] = a);
}
return { value: await queue.pop(), done: false };
},
throw: emitError
})
};
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With