Given following variation of queue:
interface AsyncQueue<T> {
//add new element to the queue
void add(T elem);
//request single element from the queue via callback
//callback will be called once for single polled element when it is available
//so, to request multiple elements, poll() must be called multiple times with (possibly) different callbacks
void poll(Consumer<T> callback);
}
I found out i do not know how to implement it using java.util.concurrent primitives! So questions are:
An asynchronous queue is like a standard queue, except that when you dequeue an element from an empty queue, the computation blocks instead of failing.
A queue object based on an asynchronous function can be created which is passed as a worker. Task: Here, it takes two parameters, first — the task to be performed and second — the callback function.
asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio.
Your AsyncQueue
is very similar to a BlockingQueue
such as ArrayBlockingQueue
. The Future
returned would simply delegate to the ArrayBlockingQueue
methods. Future.get
would call blockingQueue.poll
for instance.
As for your update, I'm assuming the thread that calls add
should invoke the callback if there's one waiting? If so it's a simple task of creating one queue for elements, and one queue for callbacks.
Code outline:
class AsyncQueue<E> {
Queue<Consumer<E>> callbackQueue = new LinkedList<>();
Queue<E> elementQueue = new LinkedList<>();
public synchronized void add(E e) {
if (callbackQueue.size() > 0)
callbackQueue.remove().accept(e);
else
elementQueue.offer(e);
}
public synchronized void poll(Consumer<E> c) {
if (elementQueue.size() > 0)
c.accept(elementQueue.remove());
else
callbackQueue.offer(c);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With