Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concatenating parallel streams

Suppose that I have two int[] arrays input1 and input2. I want to take only positive numbers from the first one, take distinct numbers from the second one, merge them together, sort and store into the resulting array. This can be performed using streams:

int[] result = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                   Arrays.stream(input2).distinct()).sorted().toArray();

I want to speed up the task, so I consider to make the stream parallel. Usually this just means that I can insert .parallel() anywhere between the stream construction and terminal operation and the result will be the same. The JavaDoc for IntStream.concat says that the resulting stream will be parallel if any of the input streams is parallel. So I thought that making parallel() either input1 stream or input2 stream or the concatenated stream will produce the same result.

Actually I was wrong: if I add .parallel() to the resulting stream, it seems that the input streams remain sequential. Moreover, I can mark the input streams (either of them or both) as .parallel(), then turn the resulting stream to .sequential(), but the input remains parallel. So actually there are 8 possibilities: either of input1, input2 and concatenated stream can be parallel or not:

int[] sss = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] ssp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0),
                Arrays.stream(input2).distinct()).parallel().sorted().toArray();
int[] sps = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] spp = IntStream.concat(Arrays.stream(input1).filter(x -> x > 0), 
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();
int[] pss = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sequential().sorted().toArray();
int[] psp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).distinct()).sorted().toArray();
int[] pps = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sequential().sorted().toArray();
int[] ppp = IntStream.concat(Arrays.stream(input1).parallel().filter(x -> x > 0),
                Arrays.stream(input2).parallel().distinct()).sorted().toArray();

I benchmarked all the versions for different input sizes (using JDK 8u45 64bit on Core i5 4xCPU, Win7) and got different results for every case:

Benchmark           (n)  Mode  Cnt       Score       Error  Units
ConcatTest.SSS      100  avgt   20       7.094 ±     0.069  us/op
ConcatTest.SSS    10000  avgt   20    1542.820 ±    22.194  us/op
ConcatTest.SSS  1000000  avgt   20  350173.723 ±  7140.406  us/op
ConcatTest.SSP      100  avgt   20       6.176 ±     0.043  us/op
ConcatTest.SSP    10000  avgt   20     907.855 ±     8.448  us/op
ConcatTest.SSP  1000000  avgt   20  264193.679 ±  6744.169  us/op
ConcatTest.SPS      100  avgt   20      16.548 ±     0.175  us/op
ConcatTest.SPS    10000  avgt   20    1831.569 ±    13.582  us/op
ConcatTest.SPS  1000000  avgt   20  500736.204 ± 37932.197  us/op
ConcatTest.SPP      100  avgt   20      23.871 ±     0.285  us/op
ConcatTest.SPP    10000  avgt   20    1141.273 ±     9.310  us/op
ConcatTest.SPP  1000000  avgt   20  400582.847 ± 27330.492  us/op
ConcatTest.PSS      100  avgt   20       7.162 ±     0.241  us/op
ConcatTest.PSS    10000  avgt   20    1593.332 ±     7.961  us/op
ConcatTest.PSS  1000000  avgt   20  383920.286 ±  6650.890  us/op
ConcatTest.PSP      100  avgt   20       9.877 ±     0.382  us/op
ConcatTest.PSP    10000  avgt   20     883.639 ±    13.596  us/op
ConcatTest.PSP  1000000  avgt   20  257921.422 ±  7649.434  us/op
ConcatTest.PPS      100  avgt   20      16.412 ±     0.129  us/op
ConcatTest.PPS    10000  avgt   20    1816.782 ±    10.875  us/op
ConcatTest.PPS  1000000  avgt   20  476311.713 ± 19154.558  us/op
ConcatTest.PPP      100  avgt   20      23.078 ±     0.622  us/op
ConcatTest.PPP    10000  avgt   20    1128.889 ±     7.964  us/op
ConcatTest.PPP  1000000  avgt   20  393699.222 ± 56397.445  us/op

From these results I can only conclude that parallelization of distinct() step reduces the overall performance (at least in my tests).

So I have the following questions:

  1. Are there any official guidelines on how to better use the parallelization with concatenated streams? It's not always feasible to test all possible combinations (especially when concatenating more than two streams), so having some "rules of thumb" would be nice.
  2. Seems that if I concatenate the streams created directly from collection/array (without intermediate operations performed before concatenation), then results do not depend so much on the location of parallel() . Is this true?
  3. Are there any other cases besides concatenation where the result depends on at which point the stream pipeline is parallelized?
like image 778
Tagir Valeev Avatar asked May 26 '15 16:05

Tagir Valeev


1 Answers

The specification precisely describes what you get—when you consider that, unlike other operations, we are not talking about a single pipeline but three distinct Streams which retain their properties independent of the others.

The specification says: “The resulting stream is […] parallel if either of the input streams is parallel.” and that’s what you get; if either input stream is parallel, the resulting stream is parallel (but you can turn it to sequential afterwards). But changing the resulting stream to parallel or sequential does not change the nature of the input streams nor does feeding a parallel and a sequential stream into concat.

Regarding the performance consequences, consult the documentation, paragraph “Stream operations and pipelines”:

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements.

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.

You have chosen the very two named stateful operations and combined them. So the .sorted() operation of the resulting stream requires a buffering of the entire content before it can start the sorting which implies a completion of the distinct operation. The distinct operation is obviously hard to parallelize as the threads have to synchronize about the already seen values.

So to answer you first question, it’s not about concat but simply that distinct doesn’t benefit from parallel execution.

This also renders your second question obsolete as your are performing entirely different operations in the two concatenated streams so you can’t do the same with a pre-concatenated collection/array. Concatenating the arrays and running distinct on the resulting array is unlikely to yield better results.

Regarding your third question, flatMap’s behavior regarding parallel streams may be a source of surprises…

like image 114
Holger Avatar answered Sep 18 '22 15:09

Holger