Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement asynchronous queue?

Given following variation of queue:

interface AsyncQueue<T> {
    //add new element to the queue
    void add(T elem); 
    //request single element from the queue via callback
    //callback will be called once for single polled element when it is available
    //so, to request multiple elements, poll() must be called multiple times with (possibly) different callbacks
    void poll(Consumer<T> callback);
}

I found out i do not know how to implement it using java.util.concurrent primitives! So questions are:

  • What is the right way to implement it using java.util.concurrent package?
  • Is it possible to do this w/o using additional thread pool?
like image 322
Sergey Alaev Avatar asked Feb 25 '15 13:02

Sergey Alaev


People also ask

What is an asynchronous queue?

An asynchronous queue is like a standard queue, except that when you dequeue an element from an empty queue, the computation blocks instead of failing.

What are the two arguments that async queue takes?

A queue object based on an asynchronous function can be created which is passed as a worker. Task: Here, it takes two parameters, first — the task to be performed and second — the callback function.

Is Asyncio Threadsafe a queue?

asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio.


1 Answers

Your AsyncQueue is very similar to a BlockingQueue such as ArrayBlockingQueue. The Future returned would simply delegate to the ArrayBlockingQueue methods. Future.get would call blockingQueue.poll for instance.


As for your update, I'm assuming the thread that calls add should invoke the callback if there's one waiting? If so it's a simple task of creating one queue for elements, and one queue for callbacks.

  • Upon add, check if there's a callback waiting, then call it, otherwise put the element on the element queue
  • Upon poll, check if there's an element waiting, then call the callback with that element, otherwise put the callback on the callback queue

Code outline:

class AsyncQueue<E> {

    Queue<Consumer<E>> callbackQueue = new LinkedList<>();
    Queue<E> elementQueue = new LinkedList<>();

    public synchronized void add(E e) {
        if (callbackQueue.size() > 0)
            callbackQueue.remove().accept(e);
        else
            elementQueue.offer(e);
    }

    public synchronized void poll(Consumer<E> c) {
        if (elementQueue.size() > 0)
            c.accept(elementQueue.remove());
        else
            callbackQueue.offer(c);
    }
}
like image 170
aioobe Avatar answered Oct 12 '22 12:10

aioobe