Why the below code doesn't print any output whereas if we remove parallel, it prints 0, 1?
IntStream.iterate(0, i -> ( i + 1 ) % 2)
.parallel()
.distinct()
.limit(10)
.forEach(System.out::println);
Though I know ideally limit should be placed before distinct, but my question is more related with the difference caused by adding parallel processing.
We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.
Java Language Streams Infinite StreamsCalling a terminal method on an infinite Stream causes the Stream to enter an infinite loop. The limit method of a Stream can be used to limit the number of terms of the Stream that Java processes. This example generates a Stream of all natural numbers, starting with the number 1.
The real cause is that ordered parallel .distinct()
is the full barrier operation as described in documentation:
Preserving stability for
distinct()
in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed.
The "full barrier operation" means that all the upstream operations must be performed before the downstream can start. There are only two full barrier operations in Stream API: .sorted()
(every time) and .distinct()
(in ordered parallel case). As you have non short-circuit infinite stream supplied to the .distinct()
you end up with infinite loop. By contract .distinct()
cannot just emit elements to the downstream in any order: it should always emit the first repeating element. While it's theoretically possible to implement parallel ordered .distinct()
better, it would be much more complex implementation.
As for solution, @user140547 is right: add .unordered()
before .distinct()
this switches distinct()
algorithm to unordered one (which just uses shared ConcurrentHashMap
to store all the observed elements and emits every new element to the downstream). Note that adding .unordered()
after .distinct()
will not help.
Stream.iterate returns 'an infinite sequential ordered Stream'. Therefore, making a sequential stream parallel is not too useful.
According to the description of the Stream package:
For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.
this seems to be the case in your case, using unordered(), it prints 0,1.
IntStream.iterate(0, i -> (i + 1) % 2)
.parallel()
.unordered()
.distinct()
.limit(10)
.forEach(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With