Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

parallel processing with infinite stream in Java

Why the below code doesn't print any output whereas if we remove parallel, it prints 0, 1?

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

Though I know ideally limit should be placed before distinct, but my question is more related with the difference caused by adding parallel processing.

like image 780
krmanish007 Avatar asked Feb 03 '16 22:02

krmanish007


People also ask

Can Java streams be of infinite size?

We can create an infinite stream of any custom type elements by passing a function of a Supplier interface to a generate() method on a Stream.

What is infinite stream in Java?

Java Language Streams Infinite StreamsCalling a terminal method on an infinite Stream causes the Stream to enter an infinite loop. The limit method of a Stream can be used to limit the number of terms of the Stream that Java processes. This example generates a Stream of all natural numbers, starting with the number 1.


2 Answers

The real cause is that ordered parallel .distinct() is the full barrier operation as described in documentation:

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed.

The "full barrier operation" means that all the upstream operations must be performed before the downstream can start. There are only two full barrier operations in Stream API: .sorted() (every time) and .distinct() (in ordered parallel case). As you have non short-circuit infinite stream supplied to the .distinct() you end up with infinite loop. By contract .distinct() cannot just emit elements to the downstream in any order: it should always emit the first repeating element. While it's theoretically possible to implement parallel ordered .distinct() better, it would be much more complex implementation.

As for solution, @user140547 is right: add .unordered() before .distinct() this switches distinct() algorithm to unordered one (which just uses shared ConcurrentHashMap to store all the observed elements and emits every new element to the downstream). Note that adding .unordered() after .distinct() will not help.

like image 197
Tagir Valeev Avatar answered Oct 30 '22 10:10

Tagir Valeev


Stream.iterate returns 'an infinite sequential ordered Stream'. Therefore, making a sequential stream parallel is not too useful.

According to the description of the Stream package:

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

this seems to be the case in your case, using unordered(), it prints 0,1.

    IntStream.iterate(0, i -> (i + 1) % 2)
            .parallel()
            .unordered()
            .distinct()
            .limit(10)
            .forEach(System.out::println);
like image 39
user140547 Avatar answered Oct 30 '22 10:10

user140547