Suppose that I have two int[]
arrays input1
and input2
. I want to take only positive numbers from the first one, take distinct numbers from the second one, merge them together, sort and store into the resulting array. This can be performed using streams:
int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
I want to speed up the task, so I consider to make the stream parallel. Usually this just means that I can insert .parallel()
anywhere between the stream construction and terminal operation and the result will be the same. The JavaDoc for IntStream.concat says that the resulting stream will be parallel if any of the input streams is parallel. So I thought that making parallel()
either input1
stream or input2
stream or the concatenated stream will produce the same result.
Actually I was wrong: if I add .parallel()
to the resulting stream, it seems that the input streams remain sequential. Moreover, I can mark the input streams (either of them or both) as .parallel()
, then turn the resulting stream to .sequential()
, but the input remains parallel. So actually there are 8 possibilities: either of input1, input2 and concatenated stream can be parallel or not:
int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
Arrays.stream(input2).parallel().distinct()).sorted().toArray();
I benchmarked all the versions for different input sizes (using JDK 8u45 64bit on Core i5 4xCPU, Win7) and got different results for every case:
Benchmark (n) Mode Cnt Score Error Units
ConcatTest.SSS 100 avgt 20 7.094 ± 0.069 us/op
ConcatTest.SSS 10000 avgt 20 1542.820 ± 22.194 us/op
ConcatTest.SSS 1000000 avgt 20 350173.723 ± 7140.406 us/op
ConcatTest.SSP 100 avgt 20 6.176 ± 0.043 us/op
ConcatTest.SSP 10000 avgt 20 907.855 ± 8.448 us/op
ConcatTest.SSP 1000000 avgt 20 264193.679 ± 6744.169 us/op
ConcatTest.SPS 100 avgt 20 16.548 ± 0.175 us/op
ConcatTest.SPS 10000 avgt 20 1831.569 ± 13.582 us/op
ConcatTest.SPS 1000000 avgt 20 500736.204 ± 37932.197 us/op
ConcatTest.SPP 100 avgt 20 23.871 ± 0.285 us/op
ConcatTest.SPP 10000 avgt 20 1141.273 ± 9.310 us/op
ConcatTest.SPP 1000000 avgt 20 400582.847 ± 27330.492 us/op
ConcatTest.PSS 100 avgt 20 7.162 ± 0.241 us/op
ConcatTest.PSS 10000 avgt 20 1593.332 ± 7.961 us/op
ConcatTest.PSS 1000000 avgt 20 383920.286 ± 6650.890 us/op
ConcatTest.PSP 100 avgt 20 9.877 ± 0.382 us/op
ConcatTest.PSP 10000 avgt 20 883.639 ± 13.596 us/op
ConcatTest.PSP 1000000 avgt 20 257921.422 ± 7649.434 us/op
ConcatTest.PPS 100 avgt 20 16.412 ± 0.129 us/op
ConcatTest.PPS 10000 avgt 20 1816.782 ± 10.875 us/op
ConcatTest.PPS 1000000 avgt 20 476311.713 ± 19154.558 us/op
ConcatTest.PPP 100 avgt 20 23.078 ± 0.622 us/op
ConcatTest.PPP 10000 avgt 20 1128.889 ± 7.964 us/op
ConcatTest.PPP 1000000 avgt 20 393699.222 ± 56397.445 us/op
From these results I can only conclude that parallelization of distinct()
step reduces the overall performance (at least in my tests).
So I have the following questions:
parallel()
. Is this true?The specification precisely describes what you get—when you consider that, unlike other operations, we are not talking about a single pipeline but three distinct Stream
s which retain their properties independent of the others.
The specification says: “The resulting stream is […] parallel if either of the input streams is parallel.” and that’s what you get; if either input stream is parallel, the resulting stream is parallel (but you can turn it to sequential afterwards). But changing the resulting stream to parallel or sequential does not change the nature of the input streams nor does feeding a parallel and a sequential stream into concat
.
Regarding the performance consequences, consult the documentation, paragraph “Stream operations and pipelines”:
Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as
filter
andmap
, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such asdistinct
andsorted
, may incorporate state from previously seen elements when processing new elements.Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.
You have chosen the very two named stateful operations and combined them. So the .sorted()
operation of the resulting stream requires a buffering of the entire content before it can start the sorting which implies a completion of the distinct
operation. The distinct operation is obviously hard to parallelize as the threads have to synchronize about the already seen values.
So to answer you first question, it’s not about concat
but simply that distinct
doesn’t benefit from parallel execution.
This also renders your second question obsolete as your are performing entirely different operations in the two concatenated streams so you can’t do the same with a pre-concatenated collection/array. Concatenating the arrays and running distinct
on the resulting array is unlikely to yield better results.
Regarding your third question, flatMap
’s behavior regarding parallel
streams may be a source of surprises…
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With