Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java - ConcurrentLinkedQueue - poll all

Assume I have the class field of ConcurrentLinkedQueue type. Some methods of this class are offering the new elements to this queue. And some other methods need to poll for all the elements that are inside queue at this exact moment.

I can't use poll() in loop, because there is a chance that some elements might be offered to this queue while loop is still unfinished. And if the new elements are offered faster than I poll them, I think it can even be an endless loop. So I need some kind of pollAll().

Is there a way I can achieve this? Maybe there is a collection suitable for this?

like image 368
coolguy Avatar asked Mar 14 '23 16:03

coolguy


2 Answers

If you can change your application to use one of the BlockingQueue implementations, there is a method drainTo that seems to do exactly what you want. It removes the current contents of the queue and transfers them to a destination collection.

There are a variety of BlockingQueue implementations; they should all be thread-safe. Oddly, it's not specified that drainTo is atomic, though it is in the implementations I checked (ArrayBlockingQueue and LinkedBlockingQueue).

like image 139
Stuart Marks Avatar answered Mar 23 '23 03:03

Stuart Marks


It seems like you need some sort of "pause" moment. One way of doing:

    AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
    int size = queue.size();

    for (int i = 0; i < size; i++) {
        Object object = queue.poll();
        if (object == null) {
            // Collection has shronk break
            break;
        }
        // Do processing here
    } 

By storing size into a local variable, the size will not change and you can use that amount of elements to process. If during process elements are added it won't be affected with an endless loop.

UPDATE: The .iterator() is probably better to use than my first example:

The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

UPDATE 2: This is the method that will take all the elements also the deleted once and process it.

        AbstractQueue<Object> queue = new ConcurrentLinkedQueue<>();
        queue.add("Test1");
        queue.add("Test2");
        queue.add("Test3");

        Object[] objList = queue.toArray();

        queue.remove("Test2");

        for (Object obj : objList) {
            // Make sure you delete it, because we don't use .poll
            // Put it at top, to reproduce the poll as much as possible
            queue.remove(obj);

            // Do processing
        }

This will output:

Test1
Test2
Test3 

Because of the copy to the new objList[] it will also show "Test2". This example may cause issues with the .remove() method if duplicates are added into the list. Because if and element is added is duplicate to the object in .remove() then that object is instantly removed.

Also note that this approach is slower, because .remove() need to loop through elements to find it O(N1), where .poll is instant O(1).

like image 21
Albert Bos Avatar answered Mar 23 '23 02:03

Albert Bos