I have a Record
class:
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
I create a big list of a lot of records. Only the second and fifth values, i / 10000
and i
, are used later, by the getters getCategory2()
and getValue1()
respectively.
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i / 10000, "B", "C", i, (double) i / 100 + 1));
}
Note that first 10,000 records have a category2
of 0
, then next 10,000 have 1
, etc., while the value1
values are 0-114999 sequentially.
I create a Stream
that is both parallel
and sorted
.
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
I have a ForkJoinPool
that maintains 8
threads, which is the number of cores I have on my PC.
ForkJoinPool pool = new ForkJoinPool(8);
I use the trick described here to submit a stream processing task to my own ForkJoinPool
instead of the common ForkJoinPool
.
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
I expected that the parallel sorted
operation would respect the encounter order of the stream, and that it would be a stable sort, because the Spliterator
returned by ArrayList
is ORDERED
.
However, simple code that prints out the elements of the resultant List
output
in order shows that it's not quite the case.
for (Record record : output)
{
System.out.println(record.getValue1());
}
Output, condensed:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
The size()
of output
is 115000
, and all elements appear to be there, just in a slightly different order.
So I wrote some checking code to see if the sort
was stable. If it's stable, then all of the value1
values should remain in order. This code verifies the order, printing any discrepancies.
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
Output:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
This condition persists if I do any of the following:
Replace the ForkJoinPool
with a ThreadPoolExecutor
.
ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
Use the common ForkJoinPool
by processing the Stream
directly.
List<Record> output = stream.collect(Collectors.toList());
Call parallel()
after I call sorted
.
Stream<Record> stream = list.stream().sorted().parallel();
Call parallelStream()
instead of stream().parallel()
.
Stream<Record> stream = list.parallelStream().sorted();
Sort using a Comparator
. Note that this sort criterion is different that the "natural" order I defined for the Comparable
interface, although starting with the results already in order from the beginning, the result should still be the same.
Stream<Record> stream = list.stream().parallel().sorted(
(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
);
I can only get this to preserve the encounter order if I don't do one of the following on the Stream
:
parallel()
.sorted
.Interestingly, the parallel()
without a sort preserved the order.
In both of the above cases, the output is:
Verified: true
My Java version is 1.8.0_05. This anomaly also occurs on Ideone, which appears to be running Java 8u25.
Update
I've upgraded my JDK to the latest version as of this writing, 1.8.0_45, and the problem is unchanged.
Question
Is the record order in the resultant List
(output
) out of order because the sort is somehow not stable, because the encounter order is not preserved, or some other reason?
How can I ensure that the encounter order is preserved when I create a parallel stream and sort it?
parallel whereas you want to process items in order, so you have to ask about ordering. If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn't matter whether the stream is processed in parallel or sequential; the implementation will maintain the order.
Simply put, encounter order is the order in which a Stream encounters data.
1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.
A parallel stream is performed one or more elements at a time. Thus the map() would preserve the encounter of the stream order but not the original List's order.
It looks like Arrays.parallelSort
isn't stable in some circumstances. Well spotted. The stream parallel sort is implemented in terms of Arrays.parallelSort
, so it affects streams as well. Here's a simplified example:
public class StableSortBug { static final int SIZE = 50_000; static class Record implements Comparable<Record> { final int sortVal; final int seqNum; Record(int i1, int i2) { sortVal = i1; seqNum = i2; } @Override public int compareTo(Record other) { return Integer.compare(this.sortVal, other.sortVal); } } static Record[] genArray() { Record[] array = new Record[SIZE]; Arrays.setAll(array, i -> new Record(i / 10_000, i)); return array; } static boolean verify(Record[] array) { return IntStream.range(1, array.length) .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum); } public static void main(String[] args) { Record[] array = genArray(); System.out.println(verify(array)); Arrays.sort(array); System.out.println(verify(array)); Arrays.parallelSort(array); System.out.println(verify(array)); } }
On my machine (2 core x 2 threads) this prints the following:
true true false
Of course, it's supposed to print true
three times. This is on the current JDK 9 dev builds. I wouldn't be surprised if it occurs in all the JDK 8 releases thus far, given what you've tried. Curiously, reducing the size or the divisor will change the behavior. A size of 20,000 and a divisor of 10,000 is stable, and a size of 50,000 and a divisor of 1,000 is also stable. It seems like the problem has to do with a sufficiently large run of values comparing equal versus the parallel split size.
The OpenJDK issue JDK-8076446 covers this bug.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With