Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do sorted and distinct immediately process the stream?

Imagine I have something that looks like this:

Stream<Integer> stream = Stream.of(2,1,3,5,6,7,9,11,10)
            .distinct()
            .sorted();

The javadocs for both distinct() and sorted() say that they are "stateful intermediate operation". Does that mean that internally the stream will do something like create a hash set, add all the stream values, then seeing sorted() will throw those values into a sorted list or sorted set? Or is it smarter than that?

In other words, does .distinct().sorted() cause java to traverse the stream twice or does java delay that until a terminal operation is performed (such as .collect)?

like image 509
Cogman Avatar asked Mar 14 '18 23:03

Cogman


People also ask

How does distinct work in Java stream?

Java Stream distinct() Method It means that the element occurring first will be present in the distinct elements stream. If the stream is unordered, then the resulting stream elements can be in any order. Stream distinct() is a stateful intermediate operation.

Is stream sorted stable?

Stream sorted() in JavaFor ordered streams, the sort method is stable but for unordered streams, no stability is guaranteed. It is a stateful intermediate operation i.e, it may incorporate state from previously seen elements when processing new elements.

Which method is used to sort the stream?

Stream sorted(Comparator comparator) returns a stream consisting of the elements of this stream, sorted according to the provided Comparator.

What is the purpose of sorted method of stream in Java 8?

sorted. Returns a stream consisting of the elements of this stream, sorted according to natural order. If the elements of this stream are not Comparable , a java. lang.


2 Answers

You have asked a loaded question, implying that there had to be a choice between two alternatives.

The stateful intermediate operations have to store data, in some cases up to the point of storing all elements before being able to pass an element downstream, but that doesn’t change the fact that this work is deferred until a terminal operation has been commenced.

It’s also not correct to say that it has to “traverse the stream twice”. There are entirely different traversals going on, e.g. in the case of sorted(), first, the traversal of the source filling on internal buffer that will be sorted, second, the traversal of the buffer. In case of distinct(), no second traversal happens in the sequential processing, the internal HashSet is just used to determine whether to pass an element downstream.

So when you run

Stream<Integer> stream = Stream.of(2,1,3,5,3)
    .peek(i -> System.out.println("source: "+i))
    .distinct()
    .peek(i -> System.out.println("distinct: "+i))
    .sorted()
    .peek(i -> System.out.println("sorted: "+i));
System.out.println("commencing terminal operation");
stream.forEachOrdered(i -> System.out.println("terminal: "+i));

it prints

commencing terminal operation
source: 2
distinct: 2
source: 1
distinct: 1
source: 3
distinct: 3
source: 5
distinct: 5
source: 3
sorted: 1
terminal: 1
sorted: 2
terminal: 2
sorted: 3
terminal: 3
sorted: 5
terminal: 5

showing that nothing happens before the terminal operation has been commenced and that elements from the source immediately pass the distinct() operation (unless being duplicates), whereas all elements are buffered in the sorted() operation before being passed downstream.

It can further be shown that distinct() does not need to traverse the entire stream:

Stream.of(2,1,1,3,5,6,7,9,2,1,3,5,11,10)
    .peek(i -> System.out.println("source: "+i))
    .distinct()
    .peek(i -> System.out.println("distinct: "+i))
    .filter(i -> i>2)
    .findFirst().ifPresent(i -> System.out.println("found: "+i));

prints

source: 2
distinct: 2
source: 1
distinct: 1
source: 1
source: 3
distinct: 3
found: 3

As explained and demonstrated by Jose Da Silva’s answer, the amount of buffering may change with ordered parallel streams, as partial results must be adjusted before they can get passed to downstream operations.

Since these operations do not happen before the actual terminal operation is known, there are more optimizations possible than currently happen in OpenJDK (but may happen in different implementations or future versions). E.g. sorted().toArray() may use and return the same array or sorted().findFirst() may turn into a min(), etc.

like image 183
Holger Avatar answered Oct 12 '22 23:10

Holger


Acording to the javadoc both distinct and sorted methods are stateful intermediate operations.

The StreamOps says the following about this operations:

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data.

But the collect of the stream, happens only in the terminal operation (e.g. toArray, collect or forEach), both operations are processed in the pipeline and the data flows through it. Still, one important thing to note is the order in which this operations are executed, the javadoc of the distinct() method says:

For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.


For sequential streams, when this stream is sorted, the only element checked is the previous, when is not sorted a HashSet is used internally instead, for this reason executing distinct after sort results in a better performance.

(note: as commented by Eugene the performance gain may be tiny in this secuential streams, specially when the code is hot, but still avoids the creation of that extra temporal HashSet)

Here you can see more about the order of distinct and sort:

Java Streams: How to do an efficient "distinct and sort"?


By the other hand, for parallel streams the doc says:

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed. Using an unordered stream source (such as generate(Supplier)) or removing the ordering constraint with BaseStream.unordered() may result in significantly more efficient execution for distinct() in parallel pipelines, if the semantics of your situation permit.

A full barrier operation means that:

All the upstream operations must be performed before the downstream can start. There are only two full barrier operations in Stream API: .sorted() (every time) and .distinct() (in ordered parallel case).

For this reason, when using parallel streams the opposite order is normally better (as long as the current stream is unordered), that is the use of distinct before sorted, because sorted can start to receive elements while distinct is being processed.

Using the opposite order, first sorting (an unordered parallel stream) and then using distinct, puts a barrier in both, first all elements have to be processed (flow) for sort, then all for distinct.

Here is an example:

Function<String, IntConsumer> process = name ->
        idx -> {
            TimeUnit.SECONDS.sleep(ThreadLocalRandom
                    .current().nextInt(3)); // handle exception or use 
                                            // LockSupport.parkNanos(..) sugested by Holger
            System.out.println(name + idx);
        };

The below function receives a name, and retuns a int consumer that sleeps from 0-2 seconds and then prints.

IntStream.range(0, 8).parallel() // n > number of cores
        .unordered() // range generates ordered stream (not sorted)
        .peek(process.apply("B"))
        .distinct().peek(process.apply("D"))
        .sorted().peek(process.apply("S"))
        .toArray(); // terminal operation

This will print, Mix of B's and D's and then all S's (no barrier in distinct).

If you change the order of sorted and distinct:

        // ... rest
        .sorted().peek(process.apply("S"))
        .distinct().peek(process.apply("D"))
        // ... rest

This will print, all B's then all S's and then all D's (barrier in distinct).

If you want to try even more add an unordered after sorted again:

        // ... rest
        .sorted().unordered().peek(process.apply("S"))
        .distinct().peek(process.apply("D"))
        // ... rest

This will print, All B's and then a mix of S's and D's (no barrier in distinct again).


Edit:

Changed a little the code to a better explanation and use of ThreadLocalRandom.current().nextInt(3) as sugested.

like image 34
Jose Da Silva Avatar answered Oct 13 '22 00:10

Jose Da Silva