I know that the documentation says that the object is thread safe but does that mean that all access to it from all methods are thread safe? So if I call put()
on it from many threads at once and take()
on it at the same instance, will nothing bad happen?
I ask because this answer is making me second guess: https://stackoverflow.com/a/22006181/4164238
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
PriorityBlockingQueue is thread safe. The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.
The quick answer is yes, they are thread safe. But lets not leave it there ...
Firstly a little house keeping, BlockingQueue
is an interface, and any implementation that is not thread safe will be breaking the documented contract. The link that you included was referring to LinkedBlockingQueue
, which has some cleverness to it.
The link that you included makes an interesting observation, yes there are two locks within LinkedBlockingQueue
. However it fails to understand that the edge case that a 'simple' implementation would have fallen foul of was in-fact being handled, which is why the take and put methods are more complicated than one would at first expect.
LinkedBlockingQueue
is optimized to avoid using the same lock on both reading and writing, this reduces contention however for correct behavior it relies on the queue not being empty. When the queue has elements within it, then the push and the pop points are not at the same region of memory and contention can be avoided. However when the queue is empty then the contention cannot be avoided, and so extra code is required to handle this common 'edge' case. This is a common trade off between code complexity and performance/scalability.
The question then follows, how does LinkedBlockingQueue
know when the queue is empty/not empty and thus handle the threading then? The answer is that it uses an AtomicInteger
and a Condition
as two extra concurrent data structures. The AtomicInteger
is used to check whether the length of the queue is zero and the Condition is used to wait for a signal to notify a waiting thread when the queue is probably in the desired state. This extra coordination does have an overhead, however in measurements it has been shown that when ramping up the number of concurrent threads that the overheads of this technique are lower than the contention that is introduced by using a single lock.
Below I have copied the code from LinkedBlockingQueue
and added comments explaining how they work. At a high level, take()
first locks out all other calls to take()
and then signals put()
as necessary. put()
works in a similar way, first it blocks out all other calls to put()
and then signals take()
if necessary.
From the put()
method:
// putLock coordinates the calls to put() only; further coordination // between put() and take() follows below putLock.lockInterruptibly(); try { // block while the queue is full; count is shared between put() and take() // and is safely visible between cores but prone to change between calls // a while loop is used because state can change between signals, which is // why signals get rechecked and resent.. read on to see more of that while (count.get() == capacity) { notFull.await(); } // we know that the queue is not full so add enqueue(e); c = count.getAndIncrement(); // if the queue is not full, send a signal to wake up // any thread that is possibly waiting for the queue to be a little // emptier -- note that this is logically part of 'take()' but it // has to be here because take() blocks itself if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();
From take()
takeLock.lockInterruptibly(); try { // wait for the queue to stop being empty while (count.get() == 0) { notEmpty.await(); } // remove element x = dequeue(); // decrement shared count c = count.getAndDecrement(); // send signal that the queue is not empty // note that this is logically part of put(), but // for thread coordination reasons is here if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With