Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous Bounded Queue in JS/TS using async/await

I'm trying to wrap my head around async/await, and I have the following code:

class AsyncQueue<T> {
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) {
        if (this.queue.length > this.maxSize) {
            // Block until available
        }

        this.queue.unshift(x)
    }

    async dequeue() {
        if (this.queue.length == 0) {
            // Block until available
        }

        return this.queue.pop()!
    }
}

async function produce<T>(q: AsyncQueue, x: T) {
    await q.enqueue(x)
}

async function consume<T>(q: AsyncQueue): T {
    return await q.dequeue()
}

// Expecting 3 4 in the console
(async () => {
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
})()

My problem, of course, is in the "Block until available" parts of the code. I was expecting to be able to "halt" the execution until something happens (for example, dequeue halts until an enqueue exists, and vice-versa given the available space). I have the feeling I might need to use coroutines for this, but I really wanted to make sure I am just not missing any async/await magic here.

like image 752
Hugo Sereno Ferreira Avatar asked May 17 '18 02:05

Hugo Sereno Ferreira


People also ask

How do I use async and await in TypeScript?

async / await support in ES6 targets (Node v4+) Asynchronous functions are prefixed with the async keyword; await suspends the execution until an asynchronous function return promise is fulfilled and unwraps the value from the Promise returned.

Is async await really asynchronous?

In computer programming, the async/await pattern is a syntactic feature of many programming languages that allows an asynchronous, non-blocking function to be structured in a way similar to an ordinary synchronous function.

Can I use async await in JavaScript?

await can be used on its own with JavaScript modules. Note: The purpose of async / await is to simplify the syntax necessary to consume promise-based APIs. The behavior of async / await is similar to combining generators and promises. Async functions always return a promise.

Why async await is asynchronous?

Async/await Await is in an async function to ensure that all promises that are returned in the function are synchronized. With async/await, there's no use of callbacks. try and catch methods are also used to get rejection values of async functions.


1 Answers

17/04/2019 Update: Long story short, there's a bug in the AsyncSemaphore implementation below, that was caught using property-based testing. You can read all about this "tale" here. Here's the fixed version:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    }

    async wait() {
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    }
}

Finally, after considerable effort, and inspired by @Titian answer, I think I solved this. The code is filled with debug messages, but it might serve pedagogical purposes regarding the flow of control:

class AsyncQueue<T> {
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) {
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
            console.debug(`[${localTrace}] Producer Waiting`)
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug(`[${localTrace}] Producer Ready`)
        }

        this.queue.unshift(x)
        console.debug(`[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]`)

        if (this.enqueuePointer > 0) {
            console.debug(`[${localTrace}] Notify Consumer`)
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        }
    }

    async dequeue() {
        this.trace += 1
        const localTrace = this.trace

        console.debug(`[${localTrace}] Queue length before pop: ${this.queue.length}`)

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
            console.debug(`[${localTrace}] Consumer Waiting`)
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug(`[${localTrace}] Consumer Ready`)
        }

        const x = this.queue.pop()!
        console.debug(`[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}`)

        if (this.dequeuePointer > 0) {
            console.debug(`[${localTrace}] Notify Producer`)
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        }

        return x
    }
}

Update: Here's a clean version using an AsyncSemaphore, that really encapsulates the way things are usually done using concurrency primitives, but adapted to the asynchronous-CPS-single-threaded-event-loop™ style of JavaScript with async/await. You can see that the logic of AsyncQueue becomes much more intuitive, and the double synchronisation through Promises is delegated to the two semaphores:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    }

    async wait() {
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    }
}

class AsyncQueue<T> {
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) {
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    }

    async enqueue(x: T) {
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    }

    async dequeue() {
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    }
}

Update 2: There seemed to be a subtle bug hidden in the above code, that became evident when trying to use an AsyncQueue of size 0. The semantics do make sense: it is a queue without any buffer, where the publisher always awaits for an consumer to exist. The lines that were preventing it to work were:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

If you look closely, you'll see that dequeue() isn't perfectly symmetric to enqueue(). In fact, if one swaps the order of these two instructions:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

Then all works again; it seems intuitive to me that we signal that there's something interested in dequeuing() before actually waiting for an enqueuing to take place.

I'm still not sure this doesn't reintroduce subtle bugs, without extensive testing. I'll leave this as a challenge ;)

like image 162
Hugo Sereno Ferreira Avatar answered Oct 12 '22 23:10

Hugo Sereno Ferreira