I'm procesing data using latest Java 8 lambdas with parallel streams. My code is following:
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
List<String> files = Arrays.asList(new String[]{"1.txt"});
List<String> result = forkJoinPool.submit(() ->
files.stream().parallel()
.flatMap(x -> stage1(x)) //at this stage we add more elements to the stream
.map(x -> stage2(x))
.map(x -> stage3(x))
.collect(Collectors.toList())
).get();
The stream starts with one element, but more elements are added in the second stage. My assumption is that this stream should run in parallel, but in this case only one worker thread is used.
If I start with 2 elements (i.e. I add second element to the initial list), then 2 threads are spawned to handle the stream and so on... This happen also if I don't explicitly submit the stream to the ForkJoinPool.
The question is: Is it documented behaviour or it may change in the implementation? Is there any way to control this behaviour and allow more threads regardless the initial list?
What you're observing is implementation-specific behavior, not specified behavior.
The current JDK 8 implementation looks at the outermost stream's Spliterator
and uses that as the basis for splitting the parallel workload. Since the example has only a single element in the original source stream, it can't be split, and the stream runs single-threaded. This works well for the common (but by no means only) case where flatMap
returns zero, one, or just a handful of elements, but in the case where it returns a lot of elements, they're all handled sequentially. In fact, the stream returned by the flatMap
function is forced into sequential mode. See line 270 of ReferencePipeline.java.
The "obvious" thing to do is to make this stream parallel, or at least not force it to be sequential. This might or might not improve things. Most likely it'll improve some things but make other things worse. A better policy is certainly called for here, but I'm not sure what it would look like.
Also note that the technique used for forcing the parallel stream to run in a fork-join pool of your choosing, by submitting to it a task that runs the pipeline, is also implementation-specific behavior. It works this way in JDK 8, but it might change in the future.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With