See the simple example below that counts the number of occurences of each word in a list:
Stream<String> words = Stream.of("a", "b", "a", "c"); Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j));
At the end, wordsCount
is {a=2, b=1, c=1}
.
But my stream is very large and I want to parallelise the job, so I write:
Map<String, Integer> wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j));
However I have noticed that wordsCount
is a simple HashMap
so I wonder if I need to explicitly ask for a concurrent map to ensure thread safety:
Map<String, Integer> wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j));
Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?
They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.
Similarly, don't use parallel if the stream is ordered and has much more elements than you want to process, e.g. This may run much longer because the parallel threads may work on plenty of number ranges instead of the crucial one 0-100, causing this to take very long time.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.
Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.
Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?
It is safe to use a non-concurrent collector in a collect
operation of a parallel stream.
In the specification of the Collector
interface, in the section with half a dozen bullet points, is this:
For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.
This means that the various implementations provided by the Collectors
class can be used with parallel streams, even though some of those implementations might not be concurrent collectors. This also applies to any of your own non-concurrent collectors that you might implement. They can be used safely with parallel streams, provided your collectors don't interfere with the stream source, are side-effect free, order independent, etc.
I also recommend reading the Mutable Reduction section of the java.util.stream package documentation. In the middle of this section is an example that is stated to be parallelizable, but which collects results into an ArrayList
, which is not thread-safe.
The way this works is that a parallel stream ending in a non-concurrent collector makes sure that different threads are always operating on different instances of the intermediate result collections. That's why a collector has a Supplier
function, for creating as many intermediate collections as there are threads, so each thread can accumulate into its own. When intermediate results are to be merged, they are handed off safely between threads, and at any given time only a single thread is merging any pair of intermediate results.
All collectors, if they follow the rules in the specification, are safe to run in parallel or sequential. Parallel-readiness is a key part of the design here.
The distinction between concurrent and non-concurrent collectors have to do with the approach to parallelization.
An ordinary (non-concurrent) collector operates by merging sub-results. So the source is partitioned into a bunch of chunks, each chunk is collected into a result container (like a list or a map), and then the sub-results are merged into a bigger result container. This is safe and order-preserving, but for some kinds of containers -- especially maps -- can be expensive, since merging two maps by key is often expensive.
A concurrent collector instead creates one result container, whose insertion operations are guaranteed to be thread-safe, and blasts elements into it from multiple threads. With a highly concurrent result container like ConcurrentHashMap, this approach may well perform better than merging ordinary HashMaps.
So, the concurrent collectors are strictly optimizations over their ordinary counterparts. And they don't come without a cost; because elements are being blasted in from many threads, concurrent collectors generally cannot preserve encounter order. (But, often you don't care -- when creating a word count histogram, you don't care which instance of "foo" you counted first.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With