I'm trying to wrap my head around async/await
, and I have the following code:
class AsyncQueue<T> {
queue = Array<T>()
maxSize = 1
async enqueue(x: T) {
if (this.queue.length > this.maxSize) {
// Block until available
}
this.queue.unshift(x)
}
async dequeue() {
if (this.queue.length == 0) {
// Block until available
}
return this.queue.pop()!
}
}
async function produce<T>(q: AsyncQueue, x: T) {
await q.enqueue(x)
}
async function consume<T>(q: AsyncQueue): T {
return await q.dequeue()
}
// Expecting 3 4 in the console
(async () => {
const q = new AsyncQueue<number>()
consume(q).then(console.log)
consume(q).then(console.log)
produce(q, 3)
produce(q, 4)
consume(q).then(console.log)
consume(q).then(console.log)
})()
My problem, of course, is in the "Block until available" parts of the code. I was expecting to be able to "halt" the execution until something happens (for example, dequeue halts until an enqueue exists, and vice-versa given the available space). I have the feeling I might need to use coroutines for this, but I really wanted to make sure I am just not missing any async/await
magic here.
async / await support in ES6 targets (Node v4+) Asynchronous functions are prefixed with the async keyword; await suspends the execution until an asynchronous function return promise is fulfilled and unwraps the value from the Promise returned.
In computer programming, the async/await pattern is a syntactic feature of many programming languages that allows an asynchronous, non-blocking function to be structured in a way similar to an ordinary synchronous function.
await can be used on its own with JavaScript modules. Note: The purpose of async / await is to simplify the syntax necessary to consume promise-based APIs. The behavior of async / await is similar to combining generators and promises. Async functions always return a promise.
Async/await Await is in an async function to ensure that all promises that are returned in the function are synchronized. With async/await, there's no use of callbacks. try and catch methods are also used to get rejection values of async functions.
17/04/2019 Update: Long story short, there's a bug in the AsyncSemaphore implementation below, that was caught using property-based testing. You can read all about this "tale" here. Here's the fixed version:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()!()
}
async wait() {
this.permits -= 1
if (this.permits < 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
}
}
Finally, after considerable effort, and inspired by @Titian answer, I think I solved this. The code is filled with debug messages, but it might serve pedagogical purposes regarding the flow of control:
class AsyncQueue<T> {
waitingEnqueue = new Array<() => void>()
waitingDequeue = new Array<() => void>()
enqueuePointer = 0
dequeuePointer = 0
queue = Array<T>()
maxSize = 1
trace = 0
async enqueue(x: T) {
this.trace += 1
const localTrace = this.trace
if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
console.debug(`[${localTrace}] Producer Waiting`)
this.dequeuePointer += 1
await new Promise(r => this.waitingDequeue.unshift(r))
this.waitingDequeue.pop()
console.debug(`[${localTrace}] Producer Ready`)
}
this.queue.unshift(x)
console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)
if (this.enqueuePointer > 0) {
console.debug(`[${localTrace}] Notify Consumer`)
this.waitingEnqueue[this.enqueuePointer-1]()
this.enqueuePointer -= 1
}
}
async dequeue() {
this.trace += 1
const localTrace = this.trace
console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)
if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
console.debug(`[${localTrace}] Consumer Waiting`)
this.enqueuePointer += 1
await new Promise(r => this.waitingEnqueue.unshift(r))
this.waitingEnqueue.pop()
console.debug(`[${localTrace}] Consumer Ready`)
}
const x = this.queue.pop()!
console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)
if (this.dequeuePointer > 0) {
console.debug(`[${localTrace}] Notify Producer`)
this.waitingDequeue[this.dequeuePointer - 1]()
this.dequeuePointer -= 1
}
return x
}
}
Update: Here's a clean version using an AsyncSemaphore
, that really encapsulates the way things are usually done using concurrency primitives, but adapted to the asynchronous-CPS-single-threaded-event-loop™ style of JavaScript with async/await
. You can see that the logic of AsyncQueue
becomes much more intuitive, and the double synchronisation through Promises is delegated to the two semaphores:
class AsyncSemaphore {
private promises = Array<() => void>()
constructor(private permits: number) {}
signal() {
this.permits += 1
if (this.promises.length > 0) this.promises.pop()()
}
async wait() {
if (this.permits == 0 || this.promises.length > 0)
await new Promise(r => this.promises.unshift(r))
this.permits -= 1
}
}
class AsyncQueue<T> {
private queue = Array<T>()
private waitingEnqueue: AsyncSemaphore
private waitingDequeue: AsyncSemaphore
constructor(readonly maxSize: number) {
this.waitingEnqueue = new AsyncSemaphore(0)
this.waitingDequeue = new AsyncSemaphore(maxSize)
}
async enqueue(x: T) {
await this.waitingDequeue.wait()
this.queue.unshift(x)
this.waitingEnqueue.signal()
}
async dequeue() {
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
return this.queue.pop()!
}
}
Update 2: There seemed to be a subtle bug hidden in the above code, that became evident when trying to use an AsyncQueue
of size 0. The semantics do make sense: it is a queue without any buffer, where the publisher always awaits for an consumer to exist. The lines that were preventing it to work were:
await this.waitingEnqueue.wait()
this.waitingDequeue.signal()
If you look closely, you'll see that dequeue()
isn't perfectly symmetric to enqueue()
. In fact, if one swaps the order of these two instructions:
this.waitingDequeue.signal()
await this.waitingEnqueue.wait()
Then all works again; it seems intuitive to me that we signal that there's something interested in dequeuing()
before actually waiting for an enqueuing
to take place.
I'm still not sure this doesn't reintroduce subtle bugs, without extensive testing. I'll leave this as a challenge ;)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With