Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Encounter Order wrong when sorting a parallel stream

I have a Record class:

public class Record implements Comparable<Record>
{
   private String myCategory1;
   private int    myCategory2;
   private String myCategory3;
   private String myCategory4;
   private int    myValue1;
   private double myValue2;

   public Record(String category1, int category2, String category3, String category4,
      int value1, double value2)
   {
      myCategory1 = category1;
      myCategory2 = category2;
      myCategory3 = category3;
      myCategory4 = category4;
      myValue1 = value1;
      myValue2 = value2;
   }

   // Getters here
}

I create a big list of a lot of records. Only the second and fifth values, i / 10000 and i, are used later, by the getters getCategory2() and getValue1() respectively.

List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
    list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}

Note that first 10,000 records have a category2 of 0, then next 10,000 have 1, etc., while the value1 values are 0-114999 sequentially.

I create a Stream that is both parallel and sorted.

Stream<Record> stream = list.stream()
   .parallel()
   .sorted(
       //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
   )
   //.parallel()
;

I have a ForkJoinPool that maintains 8 threads, which is the number of cores I have on my PC.

ForkJoinPool pool = new ForkJoinPool(8);

I use the trick described here to submit a stream processing task to my own ForkJoinPool instead of the common ForkJoinPool.

List<Record> output = pool.submit(() ->
    stream.collect(Collectors.toList()
)).get();

I expected that the parallel sorted operation would respect the encounter order of the stream, and that it would be a stable sort, because the Spliterator returned by ArrayList is ORDERED.

However, simple code that prints out the elements of the resultant List output in order shows that it's not quite the case.

for (Record record : output)
{
     System.out.println(record.getValue1());
}

Output, condensed:

0
1
2
3
...
69996
69997
69998
69999
71875  // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000  // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062  // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999

The size() of output is 115000, and all elements appear to be there, just in a slightly different order.

So I wrote some checking code to see if the sort was stable. If it's stable, then all of the value1 values should remain in order. This code verifies the order, printing any discrepancies.

int prev = -1;
boolean verified = true;
for (Record record : output)
{
    int curr = record.getValue1();
    if (prev != -1)
    {
        if (prev + 1 != curr)
        {
            System.out.println("Warning: " + prev + " followed by " + curr + "!");
            verified = false;
        }
    }
    prev = curr;
}
System.out.println("Verified: " + verified);

Output:

Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false

This condition persists if I do any of the following:

  • Replace the ForkJoinPool with a ThreadPoolExecutor.

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
  • Use the common ForkJoinPool by processing the Stream directly.

    List<Record> output = stream.collect(Collectors.toList());
    
  • Call parallel() after I call sorted.

    Stream<Record> stream = list.stream().sorted().parallel();
    
  • Call parallelStream() instead of stream().parallel().

    Stream<Record> stream = list.parallelStream().sorted();
    
  • Sort using a Comparator. Note that this sort criterion is different that the "natural" order I defined for the Comparable interface, although starting with the results already in order from the beginning, the result should still be the same.

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
    );
    

I can only get this to preserve the encounter order if I don't do one of the following on the Stream:

  • Don't call parallel().
  • Don't call any overload of sorted.

Interestingly, the parallel() without a sort preserved the order.

In both of the above cases, the output is:

Verified: true

My Java version is 1.8.0_05. This anomaly also occurs on Ideone, which appears to be running Java 8u25.

Update

I've upgraded my JDK to the latest version as of this writing, 1.8.0_45, and the problem is unchanged.

Question

Is the record order in the resultant List (output) out of order because the sort is somehow not stable, because the encounter order is not preserved, or some other reason?

How can I ensure that the encounter order is preserved when I create a parallel stream and sort it?

like image 812
rgettman Avatar asked May 22 '15 21:05

rgettman


People also ask

How do you maintain order in parallel stream?

parallel whereas you want to process items in order, so you have to ask about ordering. If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn't matter whether the stream is processed in parallel or sequential; the implementation will maintain the order.

What is Encounter order in Streams?

Simply put, encounter order is the order in which a Stream encounters data.

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.

Does stream map maintain order?

A parallel stream is performed one or more elements at a time. Thus the map() would preserve the encounter of the stream order but not the original List's order.


1 Answers

It looks like Arrays.parallelSort isn't stable in some circumstances. Well spotted. The stream parallel sort is implemented in terms of Arrays.parallelSort, so it affects streams as well. Here's a simplified example:

public class StableSortBug {     static final int SIZE = 50_000;      static class Record implements Comparable<Record> {         final int sortVal;         final int seqNum;          Record(int i1, int i2) { sortVal = i1; seqNum = i2; }          @Override         public int compareTo(Record other) {             return Integer.compare(this.sortVal, other.sortVal);         }     }      static Record[] genArray() {         Record[] array = new Record[SIZE];         Arrays.setAll(array, i -> new Record(i / 10_000, i));         return array;     }      static boolean verify(Record[] array) {         return IntStream.range(1, array.length)                         .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum);     }      public static void main(String[] args) {         Record[] array = genArray();         System.out.println(verify(array));         Arrays.sort(array);         System.out.println(verify(array));         Arrays.parallelSort(array);         System.out.println(verify(array));     } } 

On my machine (2 core x 2 threads) this prints the following:

true true false 

Of course, it's supposed to print true three times. This is on the current JDK 9 dev builds. I wouldn't be surprised if it occurs in all the JDK 8 releases thus far, given what you've tried. Curiously, reducing the size or the divisor will change the behavior. A size of 20,000 and a divisor of 10,000 is stable, and a size of 50,000 and a divisor of 1,000 is also stable. It seems like the problem has to do with a sufficiently large run of values comparing equal versus the parallel split size.

The OpenJDK issue JDK-8076446 covers this bug.

like image 59
Stuart Marks Avatar answered Sep 29 '22 07:09

Stuart Marks