Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel streams, collectors and thread safety

See the simple example below that counts the number of occurences of each word in a list:

Stream<String> words = Stream.of("a", "b", "a", "c"); Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,                                                       (i, j) -> i + j)); 

At the end, wordsCount is {a=2, b=1, c=1}.

But my stream is very large and I want to parallelise the job, so I write:

Map<String, Integer> wordsCount = words.parallel()                                        .collect(toMap(s -> s, s -> 1,                                                       (i, j) -> i + j)); 

However I have noticed that wordsCount is a simple HashMap so I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:

Map<String, Integer> wordsCount = words.parallel()                                        .collect(toConcurrentMap(s -> s, s -> 1,                                                                 (i, j) -> i + j)); 

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

like image 535
assylias Avatar asked Mar 12 '14 11:03

assylias


People also ask

Is stream collect thread safe?

They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

When would you not use parallel streams?

Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.

Does parallel stream create threads?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

What is true about parallel streams?

Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.


2 Answers

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

It is safe to use a non-concurrent collector in a collect operation of a parallel stream.

In the specification of the Collector interface, in the section with half a dozen bullet points, is this:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

This means that the various implementations provided by the Collectors class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.

I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList, which is not thread-safe.

The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.

like image 169
Stuart Marks Avatar answered Oct 08 '22 13:10

Stuart Marks


All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.

The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.

An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.

A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.

So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)

like image 22
Brian Goetz Avatar answered Oct 08 '22 12:10

Brian Goetz