Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can a Stream be sequentially processed for part of the pipeline, and then as parallel?

I have the following code which does not work as I intended (a random line, instead of the first, is skipped):

Files.lines(path)
     .skip(1)
     .parallel()
     .forEach( System.out::println )

I have a feeling I misunderstood the behavior of Streams. The question is: Can I first treat a stream as sequential (and use "stateful intermediate operations") and then feed it into a parallel forEach?

like image 274
Aleksandr Dubinsky Avatar asked Dec 17 '13 16:12

Aleksandr Dubinsky


3 Answers

The entire pipeline is either parallel of sequential.

Try using forEachOrdered instead of forEach. In my test it skips the first line if forEachOrdered is used (with forEach it skips the last line).

forEach ignores encounter order and it seems that is also can make other operations to ignore it.

like image 160
x22 Avatar answered Sep 20 '22 15:09

x22


It's not a bug, but a feature. Calling parallel() makes the whole stream parallel. Unless a subsequent call to sequential() is made, which sets the whole stream back to sequential mode.

The javaodoc says:

Returns an equivalent stream that is parallel.

like image 35
JB Nizet Avatar answered Sep 21 '22 15:09

JB Nizet


No, you cannot do that. However your code should probably work as intended, from the Stream.skip javadocs

While skip() is generally a cheap operation on sequential stream pipelines, it can be quite expensive on ordered parallel pipelines, especially for large values of n, since skip(n) is constrained to skip not just any n elements, but the first n elements in the encounter order. Using an unordered stream source (such as generate(Supplier)) or removing the ordering constraint with BaseStream.unordered() may result in significant speedups of skip() in parallel pipelines, if the semantics of your situation permit. If consistency with encounter order is required, and you are experiencing poor performance or memory utilization with skip() in parallel pipelines, switching to sequential execution with BaseStream.sequential() may improve performance.

Whether your code works or doesnt depends on the nature of the stream returned by Files.lines(..), it depends if that stream is Ordered. These characteristics are set by the Spliterator that is used, if the stream is ordered, then it will always skip the first element. if the stream is unordered, then it will skip one element.

http://download.java.net/jdk8/docs/api/java/util/Spliterator.html

like image 41
aepurniet Avatar answered Sep 23 '22 15:09

aepurniet