I'm looking for a queue that would be the asynchronous (non-blocking) equivalent of java.util.concurrent.BlockingQueue
. Its interface would include:
public interface AsynchronousBlockingQueue<E> {
// - if the queue is empty, return a new CompletableFuture,
// that will be completed next time `add` is called
// - if the queue is not empty, return a completed CompletableFuture,
containing the first element of the list
public CompletableFuture<E> poll();
// if polling is in progress, complete the ongoing polling CompletableFuture.
// otherwise, add the element to the queue
public synchronized void add(E element);
}
If that matters, there should be just one poller thread, and polling should be done sequentially (poll
will not be called when polling is already in progress).
I expected it to already exist in the JVM, but I couldn't find it, and of course I'd rather use something from the JVM than write it myself.
Another constraint, I'm stuck with Java 8 (even though I'm definitely interested in knowing what exists in more recent versions).
BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
No, you do not need to synchronize access to the object properties, or even use volatile on the member variables. All actions performed by a thread before it queues an object on a BlockingQueue "happen-before" the object is dequeued. That means that any changes made by the first thread are visible to the second.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
So finally I wrote my own class... Interested in comments :)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
public class AsynchronousBlockingQueue<E> {
CompletableFuture<E> incompletePolling = null;
Queue<E> elementsQueue = new LinkedList<>();
// if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
// if the queue is not empty, return a completed CompletableFuture containing the first element of the list
public synchronized CompletableFuture<E> poll() {
// polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
if (incompletePolling != null)
throw new IllegalStateException("Polling is already ongoing");
if (elementsQueue.isEmpty()) {
incompletePolling = new CompletableFuture<>();
return incompletePolling;
}
CompletableFuture<E> result = new CompletableFuture<>();
result.complete(elementsQueue.poll());
return result;
}
// if polling is in progress, complete the ongoing polling CompletableFuture.
// otherwise, add the element to the queue
public synchronized void add(E element) {
if (incompletePolling != null) {
CompletableFuture<E> result = incompletePolling;
// removing must be done first because the completion could trigger code that needs the queue state to be valid
incompletePolling = null;
result.complete(element);
return;
}
elementsQueue.add(element);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With