Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non-Blocking Concurrent Queue with offer and flush

Non-Blocking Concurrent Queue with offer and flush

I'd need an unbounded non-blocking concurrent queue with basically only 2 operations:

  • offer : atomically inserts the specified item at the tail of this queue;
  • flush : takes all the items present in that moment in the queue and start processing them one by one following the insertion order. More specifically what has to be atomic is only this "takeAll" operation that will be the very first operation of the flush. All items offered to the queue after the takeAll will be inserted and then processed only by a another subsequent flush.

The goal is the consumer has a single CAS operation on the takeAll and can then iterate the elements in the list without going through a CAS operation per read. Further we already own the Node (Entry) as this is needed to store some other immutable state. The new node can take the HEAD as a constructor argument, creating a single direction linked list.

Does exist in literature a queue with these characteristics?

like image 818
Mario Fusco Avatar asked Apr 21 '15 17:04

Mario Fusco


People also ask

What is non-blocking queue?

Unlike a LinkedBlockingQueue, a ConcurrentLinkedQueue is a non-blocking queue. Thus, it does not block a thread once the queue is empty. Instead, it returns null. Since its unbounded, it'll throw a java.

What is blocking and non-blocking queue in Java?

Blocking vs Non-Blocking QueueThe producers will wait for available capacity before adding elements, while consumers will wait until the queue is empty. In those cases, the non-blocking queue will either throw an exception or return a special value, like null or false.

Is ConcurrentLinkedQueue thread-safe?

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time.

What are concurrent queues?

Concurrent Queue. A concurrent queue is basically a queue which provides protection against multiple threads mutating its state and thus causing inconsistencies. A naive way to implement a concurrent queue may be to just slap locks in its enqueue and dequeue functions when they try to modify the head and tail.


1 Answers

Here you go:

public class FunkyQueue<T> {
    private final AtomicReference<Node<T>> _tail = new AtomicReference<Node<T>>();

    public void offer(T t) {
        while(true) {
            Node<T> tail = _tail.get();
            Node<T> newTail = new Node<T>(t, tail);
            if(_tail.compareAndSet(tail, newTail)) {
                break;
            }
        }
    }

    public List<T> takeAll() {
        Node<T> tail = _tail.getAndSet(null);

        LinkedList<T> list = new LinkedList<T>();
        while(tail != null) {
            list.addFirst(tail.get());
            tail = tail.getPrevious();
        }

        return list;
    }

    private static final class Node<T>
    {
        private final T _obj;
        private Node<T> _prev;

        private Node(T obj, Node<T> prev) {
            _obj = obj;
            _prev = prev;            
        }

        public T get() {
            return _obj;
        }

        public Node<T> getPrevious() {
            return _prev;
        }
    }
}
like image 167
jtahlborn Avatar answered Nov 07 '22 19:11

jtahlborn