Let's say i have a List<Integer> ints = new ArrayList<>();
and i want to add values to it and compare the results of parallel execution using forEach()
and Collectors.toList()
.
First i add to this list some values from an sequential IntStream and forEach:
IntStream.range(0,10).boxed().forEach(ints::add);
And i get the correct result:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Now i .clear()
the list and do the same thing in parallel:
IntStream.range(0,10).parallel().boxed().forEach(ints::add);
Now due to multithreading i get the incorrect result:
ints ==> [6, 5, 8, 9, 7, 2, 4, 3, 1, 0]
Now i switch to collecting the same Stream of Integers:
IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());
And i get the correct result:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Question:
Why does the two parallel executions produce different result's and why is the Collector
producing the correct result?
If forEach
produces a random result the Collector
should too. I didn't specify any sorting and i think internally he is adding to a list like i did manually using forEach
. Since he's doing it in parallel he's add
method should get the values in unspecified order. Testing done i JShell.
EDIT: No duplicate here. I understand the linked question. WHy does the Collector produce the correct result? If he would be producing another random result i would not be asking.
Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.
First, note that parallelism offers no benefits other than the possibility of faster execution when more cores are available. A parallel execution will always involve more work than a sequential one, because in addition to solving the problem, it also has to perform dispatching and coordinating of sub-tasks.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads. Parallel streams create ForkJoinPool instance via static ForkJoinPool.
The collect
operation would produce unordered output if the Collector
you passed it had different characteristics. That is, if the CONCURRENT
and UNORDERED
flags were set (see Collector.characteristics()
).
Under the hood Collectors.toList()
is constructing a Collector
roughly equivalent to this:
Collector.of(
// Supplier of accumulators
ArrayList::new,
// Accumulation operation
List::add,
// Combine accumulators
(left, right) -> {
left.addAll(right);
return left;
}
)
A bit of logging reveals the lengths that the collect
operation is going to to maintain thread safety and stream order:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return new ArrayList<>();
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
}
)
logs:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 2 to []
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 4 to []
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 5 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 3 to []
ForkJoinPool-1-worker-0 combining [3] & [4]
ForkJoinPool-1-worker-0 combining [2] & [3, 4]
ForkJoinPool-1-worker-1 combining [5] & [6]
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 accumulating 1 to []
ForkJoinPool-1-worker-1 accumulating 8 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 9 to []
ForkJoinPool-1-worker-1 combining [8] & [9]
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 7 to []
ForkJoinPool-1-worker-1 combining [7] & [8, 9]
ForkJoinPool-1-worker-1 combining [5, 6] & [7, 8, 9]
ForkJoinPool-1-worker-0 accumulating 0 to []
ForkJoinPool-1-worker-0 combining [0] & [1]
ForkJoinPool-1-worker-0 combining [0, 1] & [2, 3, 4]
ForkJoinPool-1-worker-0 combining [0, 1, 2, 3, 4] & [5, 6, 7, 8, 9]
You can see that each read from the stream is written to a new accumulator, and that they are carefully combined to maintain order.
If we set the CONCURRENT
and UNORDERED
characteristic flags the collect method is free to take shortcuts; only one accumulator is allocated and ordered combination is unnecessary.
Using:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return Collections.synchronizedList(new ArrayList<>());
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
},
Characteristics.CONCURRENT,
Characteristics.UNORDERED
)
Logs:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 accumulating 2 to [6]
ForkJoinPool-1-worker-1 accumulating 5 to [6, 2]
ForkJoinPool-1-worker-0 accumulating 4 to [6, 2, 5]
ForkJoinPool-1-worker-0 accumulating 3 to [6, 2, 5, 4]
ForkJoinPool-1-worker-0 accumulating 1 to [6, 2, 5, 4, 3]
ForkJoinPool-1-worker-0 accumulating 0 to [6, 2, 5, 4, 3, 1]
ForkJoinPool-1-worker-1 accumulating 8 to [6, 2, 5, 4, 3, 1, 0]
ForkJoinPool-1-worker-0 accumulating 7 to [6, 2, 5, 4, 3, 1, 0, 8]
ForkJoinPool-1-worker-1 accumulating 9 to [6, 2, 5, 4, 3, 1, 0, 8, 7]
First things first, forEach
is documented as :
The behavior of this operation is explicitly nondeterministic
So in a future version of jdk, even your non-parallel code could produce "incorrect" results, that is out-of-order results. Under the current implementation, only the parallel version will produce these kind of results; but again this is no guarantee, forEach
is free to do whatever it wants internally, unlike forEachOrdered
for example.
Preserving order or not is not a property of sequential or parallel, it solely depends on the operation that break this order or not; that's it (like explicitly calling unordered
for example).
Collectors.toList
on the other hand is a terminal operation that preserve the order. Generally, unless a terminal operation is explicit in it's documentation about order, it will preserver it. So for example, see Stream::generate
:
Returns an infinite sequential unordered stream.
That being said, there are two orders in general, order in which intermediate operations are being processed and terminal operations are. The first ones are not defined, you can modify your example and check:
IntStream.range(0,10)
.parallel()
.peek(System.out::println) // out of order printing
.boxed()
.collect(Collectors.toList());
while the terminal operations order is preserved.
And the last point is that this:
....parallel().forEach(ints::add)
you simply got lucky to even see all the elements in the first place. You are adding from different threads multiple elements to a non-thread safe collection (ArrayList
); you could have easily missed elements or have nulls in your ints
. I bet that running this some number of times, would prove this.
Even if you switch to let's say Collections.synchronizedList(yourList)
, the order in which these will appear is still undefined, for the reasons stated above about forEach
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With