Assume I have the class field of ConcurrentLinkedQueue
type. Some methods of this class are offering the new elements to this queue. And some other methods need to poll for all the elements that are inside queue at this exact moment.
I can't use poll()
in loop, because there is a chance that some elements might be offered to this queue while loop is still unfinished. And if the new elements are offered faster than I poll them, I think it can even be an endless loop. So I need some kind of pollAll()
.
Is there a way I can achieve this? Maybe there is a collection suitable for this?
If you can change your application to use one of the BlockingQueue
implementations, there is a method drainTo
that seems to do exactly what you want. It removes the current contents of the queue and transfers them to a destination collection.
There are a variety of BlockingQueue
implementations; they should all be thread-safe. Oddly, it's not specified that drainTo
is atomic, though it is in the implementations I checked (ArrayBlockingQueue
and LinkedBlockingQueue
).
It seems like you need some sort of "pause" moment. One way of doing:
AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
int size = queue.size();
for (int i = 0; i < size; i++) {
Object object = queue.poll();
if (object == null) {
// Collection has shronk break
break;
}
// Do processing here
}
By storing size into a local variable, the size will not change and you can use that amount of elements to process. If during process elements are added it won't be affected with an endless loop.
UPDATE: The .iterator()
is probably better to use than my first example:
The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.
UPDATE 2: This is the method that will take all the elements also the deleted once and process it.
AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
queue.add("Test1");
queue.add("Test2");
queue.add("Test3");
Object[] objList = queue.toArray();
queue.remove("Test2");
for (Object obj : objList) {
// Make sure you delete it, because we don't use .poll
// Put it at top, to reproduce the poll as much as possible
queue.remove(obj);
// Do processing
}
This will output:
Test1
Test2
Test3
Because of the copy to the new objList[]
it will also show "Test2". This example may cause issues with the .remove()
method if duplicates are added into the list. Because if and element is added is duplicate to the object in .remove()
then that object is instantly removed.
Also note that this approach is slower, because .remove()
need to loop through elements to find it O(N1), where .poll
is instant O(1).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With