Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PriorityBlockingQueue stream in Java 8 is out of order

These two pieces of code have different order of output. First piece:

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } else {
        return;
    }
}

Second piece:

jobQueue.stream()
        .filter(TimeoutJobRequest::isReady)
        .peek(jobQueue::remove)
        .forEach(this::execute);

Note that jobQueue is a PriorityBlockingQueue.

The reordering occurs only when this::execute is relatively long (like for a couple of seconds.)

like image 764
jack2684 Avatar asked Aug 04 '15 00:08

jack2684


2 Answers

The stream of PriorityBlockingQueue follows the Iterator order, which according to the documentation:

The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.

If you want the priority order, you need to poll the elements from the PriorityBlockingQueue.

PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);

System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);

System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);

Output (it may depend on Java implementation):

-- Try 1 --
3
8
5
-- Try 2 --
3
5
8
like image 70
Helder Pereira Avatar answered Nov 16 '22 13:11

Helder Pereira


If you want to create a stream that follows the queue order you can try the following code (it empties the queue):

Stream.generate(jobQueue::poll).limit(jobQueue.size())
like image 5
Marcin Król Avatar answered Nov 16 '22 12:11

Marcin Król