Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent Set Queue

Maybe this is a silly question, but I cannot seem to find an obvious answer.

I need a concurrent FIFO queue that contains only unique values. Attempting to add a value that already exists in the queue simply ignores that value. Which, if not for the thread safety would be trivial. Is there a data structure in Java or maybe a code snipit on the interwebs that exhibits this behavior?

like image 597
Steve Skrla Avatar asked Jun 25 '10 18:06

Steve Skrla


People also ask

What is concurrent queue?

Concurrent Queue. A concurrent queue is basically a queue which provides protection against multiple threads mutating its state and thus causing inconsistencies. A naive way to implement a concurrent queue may be to just slap locks in its enqueue and dequeue functions when they try to modify the head and tail.

What is LinkedBlockingQueue used for?

The LinkedBlockingQueue class provides the implementation of all the methods in the BlockingQueue interface. These methods are used to insert, access and delete elements from linked blocking queues. Also, we will learn about two methods put() and take() that support the blocking operation in the linked blocking queue.

Is BlockingQueue thread-safe?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.

What are concurrent collections in java?

concurrent package includes a number of additions to the Java Collections Framework. These are most easily categorized by the collection interfaces provided: BlockingQueue defines a first-in-first-out data structure that blocks or times out when you attempt to add to a full queue, or retrieve from an empty queue.


1 Answers

If you want better concurrency than full synchronization, there is one way I know of to do it, using a ConcurrentHashMap as the backing map. The following is a sketch only.

public final class ConcurrentHashSet<E> extends ForwardingSet<E>     implements Set<E>, Queue<E> {   private enum Dummy { VALUE }    private final ConcurrentMap<E, Dummy> map;    ConcurrentHashSet(ConcurrentMap<E, Dummy> map) {     super(map.keySet());     this.map = Preconditions.checkNotNull(map);   }    @Override public boolean add(E element) {     return map.put(element, Dummy.VALUE) == null;   }    @Override public boolean addAll(Collection<? extends E> newElements) {     // just the standard implementation     boolean modified = false;     for (E element : newElements) {       modified |= add(element);     }     return modified;   }    @Override public boolean offer(E element) {     return add(element);   }    @Override public E remove() {     E polled = poll();     if (polled == null) {       throw new NoSuchElementException();     }     return polled;   }    @Override public E poll() {     for (E element : this) {       // Not convinced that removing via iterator is viable (check this?)       if (map.remove(element) != null) {         return element;       }     }     return null;   }    @Override public E element() {     return iterator().next();   }    @Override public E peek() {     Iterator<E> iterator = iterator();     return iterator.hasNext() ? iterator.next() : null;   } } 

All is not sunshine with this approach. We have no decent way to select a head element other than using the backing map's entrySet().iterator().next(), the result being that the map gets more and more unbalanced as time goes on. This unbalancing is a problem both due to greater bucket collisions and greater segment contention.

Note: this code uses Guava in a few places.

like image 168
Kevin Bourrillion Avatar answered Oct 16 '22 21:10

Kevin Bourrillion