Executed below code
List.of(1, 2, 3, 4).stream()
.map(
integer -> {
System.out.println(
"Before parallel operator : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.parallel()
.map(
integer -> {
System.out.println(
" After parallel operator : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.forEach(
integer -> {
System.out.println(" For Each : " + Thread.currentThread().getName() + " : " + integer);
});
Output:
Before parallel operator : main : 3
Before parallel operator : ForkJoinPool.commonPool-worker-19 : 2
Before parallel operator : ForkJoinPool.commonPool-worker-23 : 1
Before parallel operator : ForkJoinPool.commonPool-worker-5 : 4
After parallel operator : main : 6
After parallel operator : ForkJoinPool.commonPool-worker-23 : 2
After parallel operator : ForkJoinPool.commonPool-worker-19 : 4
After parallel operator : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 8
For Each : main : 12
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : ForkJoinPool.commonPool-worker-5 : 16
Apart from element 3 all other or ran in parallel? Would like to understand behavior parallel operator on subsequent calls?
Where does the parallel operator kick in and how does the parallelism continue?
The stream won't be processed until a terminal operation is called (such as forEach or collect), more to this in a bit. So, answering your question, "Where does the parallel operator kick in and how does the parallelism continue?".
What the docs say?
The documentation is clear about this matter:
the stream's mode can be modified with the BaseStream.sequential() and BaseStream.parallel() operations. The most recent sequential or parallel mode setting applies to the execution of the entire stream pipeline
A Little Demo
Now consider the following piece of code (Pardon my System.out, it's for demo purposes). If we change between parallel and sequential, the entire pipeline changes, not only the subsequent operators.
System.out.println("=== Creating stream s1 as 1,2,3,4");
var s1 = List.of(1, 2, 3, 4).stream();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("=== s2 results of applying map to s1");
var s2 = s1.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("=== s3 results of applying parallel to s2");
var s3 = s2.parallel();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("=== s4 results of applying map to s3");
var s4 = s3.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("=== s5 results of applying sequential to s4");
var s5 = s4.sequential();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("s5 is parallel? " + s5.isParallel());
This will output the following:
=== Creating stream s1 as 1,2,3,4
s1 is parallel? false
=== s2 results of applying map to s1
s1 is parallel? false
s2 is parallel? false
=== s3 results of applying parallel to s2
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
=== s4 results of applying map to s3
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
s4 is parallel? true
=== s5 results of applying sequential to s4
s1 is parallel? false
s2 is parallel? false
s3 is parallel? false
s4 is parallel? false
s5 is parallel? false
Now when you call a terminal operator like forEach or collect, it will consider only sequential streams during processing, even though parallel was called in the middle. As the documentation states, the most recent applied mode is used for the entire pipeline.
How is this useful?
You may ask. It's possible to change the behaviour in the middle of the pipeline by "breaking" the pipeline with a terminal operator. For instance, taking your example, if we apply collect right after the first map, the first map will be executed sequentially, and then parallel will apply only to the subsequent operators but, in practice, this is now a different pipeline because everything was collected in the middle to a list.
List.of(1, 2, 3, 4).stream()
.map(integer -> {
System.out.println("Before stream : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.collect(Collectors.toList())
.stream()
.parallel()
.map(integer -> {
System.out.println("After parallel stream : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.forEach(integer -> System.out.println("For Each : " + Thread.currentThread().getName() + " : " + integer));
This will now output something like:
Before stream : main : 1
Before stream : main : 2
Before stream : main : 3
Before stream : main : 4
After parallel stream : main : 6
After parallel stream : ForkJoinPool.commonPool-worker-23 : 2
After parallel stream : ForkJoinPool.commonPool-worker-5 : 4
After parallel stream : ForkJoinPool.commonPool-worker-19 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 16
For Each : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : main : 12
Notice how the first map is executed sequentially while the remaining operators are executed in parallel.
Observable streams implementations such as RxJava got a different hang of this with the observeOn operator, but they are a totally different way of doing things as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With