Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 Stream - parallel execution - different result - why?

Let's say i have a List<Integer> ints = new ArrayList<>(); and i want to add values to it and compare the results of parallel execution using forEach() and Collectors.toList().

First i add to this list some values from an sequential IntStream and forEach:

 IntStream.range(0,10).boxed().forEach(ints::add);

And i get the correct result:

ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Now i .clear() the list and do the same thing in parallel:

IntStream.range(0,10).parallel().boxed().forEach(ints::add);

Now due to multithreading i get the incorrect result:

ints ==> [6, 5, 8, 9, 7, 2, 4, 3, 1, 0]

Now i switch to collecting the same Stream of Integers:

IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());

And i get the correct result:

ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Question: Why does the two parallel executions produce different result's and why is the Collector producing the correct result?

If forEach produces a random result the Collector should too. I didn't specify any sorting and i think internally he is adding to a list like i did manually using forEach. Since he's doing it in parallel he's add method should get the values in unspecified order. Testing done i JShell.

EDIT: No duplicate here. I understand the linked question. WHy does the Collector produce the correct result? If he would be producing another random result i would not be asking.

like image 243
Robert Niestroj Avatar asked Jul 23 '18 20:07

Robert Niestroj


People also ask

What is the disadvantage of parallel stream in Java 8?

Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.

Is parallel stream always efficient?

First, note that parallelism offers no benefits other than the possibility of faster execution when more cores are available. A parallel execution will always involve more work than a sequential one, because in addition to solving the problem, it also has to perform dispatching and coordinating of sub-tasks.

How many threads can be executed at a time in parallel stream?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads. Parallel streams create ForkJoinPool instance via static ForkJoinPool.


2 Answers

The collect operation would produce unordered output if the Collector you passed it had different characteristics. That is, if the CONCURRENT and UNORDERED flags were set (see Collector.characteristics()).

Under the hood Collectors.toList() is constructing a Collector roughly equivalent to this:

Collector.of(
    // Supplier of accumulators
    ArrayList::new,
    // Accumulation operation
    List::add,
    // Combine accumulators
    (left, right) -> {
        left.addAll(right);
        return left;
    }
)

A bit of logging reveals the lengths that the collect operation is going to to maintain thread safety and stream order:

Collector.of(
    () -> {
        System.out.printf("%s supplying\n", Thread.currentThread().getName());
        return new ArrayList<>();
    },
    (l, o) -> {
        System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
        l.add(o);
    },
    (l1, l2) -> {
        System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
        l1.addAll(l2);
        return l1;
    }
)

logs:

ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 2 to []
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 4 to []
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 5 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 3 to []
ForkJoinPool-1-worker-0 combining [3] & [4]
ForkJoinPool-1-worker-0 combining [2] & [3, 4]
ForkJoinPool-1-worker-1 combining [5] & [6]
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 accumulating 1 to []
ForkJoinPool-1-worker-1 accumulating 8 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 9 to []
ForkJoinPool-1-worker-1 combining [8] & [9]
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 7 to []
ForkJoinPool-1-worker-1 combining [7] & [8, 9]
ForkJoinPool-1-worker-1 combining [5, 6] & [7, 8, 9]
ForkJoinPool-1-worker-0 accumulating 0 to []
ForkJoinPool-1-worker-0 combining [0] & [1]
ForkJoinPool-1-worker-0 combining [0, 1] & [2, 3, 4]
ForkJoinPool-1-worker-0 combining [0, 1, 2, 3, 4] & [5, 6, 7, 8, 9]

You can see that each read from the stream is written to a new accumulator, and that they are carefully combined to maintain order.

If we set the CONCURRENT and UNORDERED characteristic flags the collect method is free to take shortcuts; only one accumulator is allocated and ordered combination is unnecessary.

Using:

Collector.of(
    () -> {
        System.out.printf("%s supplying\n", Thread.currentThread().getName());
        return Collections.synchronizedList(new ArrayList<>());
    },
    (l, o) -> {
        System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
        l.add(o);
    },
    (l1, l2) -> {
        System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
        l1.addAll(l2);
        return l1;
    },
    Characteristics.CONCURRENT,
    Characteristics.UNORDERED
)

Logs:

ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 accumulating 2 to [6]
ForkJoinPool-1-worker-1 accumulating 5 to [6, 2]
ForkJoinPool-1-worker-0 accumulating 4 to [6, 2, 5]
ForkJoinPool-1-worker-0 accumulating 3 to [6, 2, 5, 4]
ForkJoinPool-1-worker-0 accumulating 1 to [6, 2, 5, 4, 3]
ForkJoinPool-1-worker-0 accumulating 0 to [6, 2, 5, 4, 3, 1]
ForkJoinPool-1-worker-1 accumulating 8 to [6, 2, 5, 4, 3, 1, 0]
ForkJoinPool-1-worker-0 accumulating 7 to [6, 2, 5, 4, 3, 1, 0, 8]
ForkJoinPool-1-worker-1 accumulating 9 to [6, 2, 5, 4, 3, 1, 0, 8, 7]
like image 134
teppic Avatar answered Sep 28 '22 05:09

teppic


First things first, forEach is documented as :

The behavior of this operation is explicitly nondeterministic

So in a future version of jdk, even your non-parallel code could produce "incorrect" results, that is out-of-order results. Under the current implementation, only the parallel version will produce these kind of results; but again this is no guarantee, forEach is free to do whatever it wants internally, unlike forEachOrdered for example.

Preserving order or not is not a property of sequential or parallel, it solely depends on the operation that break this order or not; that's it (like explicitly calling unordered for example).

Collectors.toList on the other hand is a terminal operation that preserve the order. Generally, unless a terminal operation is explicit in it's documentation about order, it will preserver it. So for example, see Stream::generate:

Returns an infinite sequential unordered stream.

That being said, there are two orders in general, order in which intermediate operations are being processed and terminal operations are. The first ones are not defined, you can modify your example and check:

IntStream.range(0,10)
         .parallel()
         .peek(System.out::println) // out of order printing
         .boxed()
         .collect(Collectors.toList());

while the terminal operations order is preserved.

And the last point is that this:

....parallel().forEach(ints::add)

you simply got lucky to even see all the elements in the first place. You are adding from different threads multiple elements to a non-thread safe collection (ArrayList); you could have easily missed elements or have nulls in your ints. I bet that running this some number of times, would prove this.

Even if you switch to let's say Collections.synchronizedList(yourList), the order in which these will appear is still undefined, for the reasons stated above about forEach

like image 27
Eugene Avatar answered Sep 28 '22 03:09

Eugene