Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Will parallel stream work fine with distinct operation?

I was reading about statelessness and came across this in doc:

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline.

Now if I have the a list of string (strList say) and then trying to remove duplicate strings from it using parallel streams in the following way:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

or in case we want case insensitive:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

Can this code have any problem as parallel streams will split the input and distinct in one chunk does not necessarily mean distinct in the whole input?

EDIT (Quick summary of the answers below)

The distinct is a stateful operation and in case of stateful intermediate operations parallel streams may require multiple passes or substantial buffering overheads. Also distinct can be implemented more efficiently if ordering of elements is not relevant. Also as per doc:

For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.

But in case of ordered stream running in parallel distinct may be unstable - means it will keep an arbitrary element in case of duplicates and not necessarily the first one as expected from distinct otherwise.

From the link:

Internally, the distinct() operation keeps a Set that contains elements that have been seen previously, but it’s buried inside the operation and we can’t get to it from application code.

So in case of parallel streams it would probably consume the entire stream or may use CHM (sth like ConcurrentHashMap.newKeySet()). And for ordered ones most likely it would be using LinkedHashSet or similar contruct.

like image 574
akhil_mittal Avatar asked Dec 06 '18 05:12

akhil_mittal


People also ask

Does parallel stream improve performance?

The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance while at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution.

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.


Video Answer


2 Answers

Roughly pointing out the relevant parts of the doc (Emphasis, mine):

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering

If you read further down (section on ordering):

Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().

...

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

In conclusion,

  • distinct will work fine with parallel streams, but as you may already know, it has to consume the entire stream before continuing and this may use a lot of memory.
  • If the source of the items is an unordered collection (such as hashset) or the stream is unordered(), then distinct is not worried about ordering the output and thus will be efficient

Solution is to add .unordered() to the stream pipeline if you are not worried about order and would like to see more performance.

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

Alas there is no (available builtin) concurrent hashset in Java (unless they got clever with ConcurrentHashMap), so I can only leave you with the unfortunate possibility that distinct is implemented in a blocking fashion using a regular Java set. In which case, I don't see any benefit of doing a parallel distinct.


Edit: I spoke too soon. There might be some benefit with using parallel streams with distinct. It looks like distinct is implemented with more cleverness than I initially thought. See @Eugene's answer.

like image 163
smac89 Avatar answered Oct 09 '22 12:10

smac89


You seem to miss quite a few things from the documentation you provide and the actual example.

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful.

In your example, you don't have any stateful operations defined by you. Stateful in the doc means the ones the you define, not the ones that are implemented by jdk itself - like distinct in your example. But either way you could define a stateful operation that would be correct, even Stuart Marks - working at Oracle/Java, provides such an example.

So you are more than OK in the examples that you provide, be it parallel or not.

The expensive part of distinct (in parallel) come from the fact that internally there has to be a thread-safe data structure that would keep distinct elements; in jdk case it is a ConcurrentHashMap used in case the order does not matter, or a reduction using a LinkedHashSet when order matters.

distinct btw is a pretty smart implementation, it looks if your source of the stream is already distinct (in such a case it is a no-op), or looks if your data is sorted, in which case it will do a little smarter traversal of the source (since it knows that if you have seen one element, the next to come is either the same you just seen or a different one), or using a ConcurrentHashMap internally, etc.

like image 3
Eugene Avatar answered Oct 09 '22 11:10

Eugene