Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous equivalent of BlockingQueue in Java?

I'm looking for a queue that would be the asynchronous (non-blocking) equivalent of java.util.concurrent.BlockingQueue. Its interface would include:

public interface AsynchronousBlockingQueue<E> {
    // - if the queue is empty, return a new CompletableFuture,
    //   that will be completed next time `add` is called
    // - if the queue is not empty, return a completed CompletableFuture,
         containing the first element of the list
    public CompletableFuture<E> poll();

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element);
}

If that matters, there should be just one poller thread, and polling should be done sequentially (poll will not be called when polling is already in progress).

I expected it to already exist in the JVM, but I couldn't find it, and of course I'd rather use something from the JVM than write it myself.

Another constraint, I'm stuck with Java 8 (even though I'm definitely interested in knowing what exists in more recent versions).

like image 967
yannick1976 Avatar asked Oct 17 '19 14:10

yannick1976


People also ask

What is BlockingQueue in Java?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

What is the max capacity of a Java BlockingQueue?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

Is BlockingQueue synchronized?

No, you do not need to synchronize access to the object properties, or even use volatile on the member variables. All actions performed by a thread before it queues an object on a BlockingQueue "happen-before" the object is dequeued. That means that any changes made by the first thread are visible to the second.

Is Java BlockingQueue thread-safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.


1 Answers

So finally I wrote my own class... Interested in comments :)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

public class AsynchronousBlockingQueue<E> {
    CompletableFuture<E> incompletePolling = null;
    Queue<E> elementsQueue = new LinkedList<>();

    // if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
    // if the queue is not empty, return a completed CompletableFuture containing the first element of the list
    public synchronized CompletableFuture<E> poll() {
        // polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
        if (incompletePolling != null)
            throw new IllegalStateException("Polling is already ongoing");
        if (elementsQueue.isEmpty()) {
            incompletePolling = new CompletableFuture<>();
            return incompletePolling;
        }
        CompletableFuture<E> result = new CompletableFuture<>();
        result.complete(elementsQueue.poll());
        return result;
    }

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element) {
        if (incompletePolling != null) {
            CompletableFuture<E> result = incompletePolling;
            // removing must be done first because the completion could trigger code that needs the queue state to be valid
            incompletePolling = null;
            result.complete(element);
            return;
        }
        elementsQueue.add(element);
    }


}
like image 88
yannick1976 Avatar answered Oct 16 '22 10:10

yannick1976