Lets say we have something like this:
LongStream.range(0, 10).parallel()
.filter(l -> {
System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName());
return l % 2 == 0;
})
.map(l -> {
System.out.format("map: %s [%s]\n", l, Thread.currentThread().getName());
return l;
});
If you run this program output would be something like:
filter: 6 [main]
map: 6 [main]
filter: 5 [main]
filter: 4 [ForkJoinPool.commonPool-worker-2]
map: 4 [ForkJoinPool.commonPool-worker-2]
filter: 1 [ForkJoinPool.commonPool-worker-3]
filter: 2 [ForkJoinPool.commonPool-worker-1]
filter: 0 [ForkJoinPool.commonPool-worker-3]
filter: 3 [ForkJoinPool.commonPool-worker-2]
filter: 8 [main]
filter: 7 [ForkJoinPool.commonPool-worker-2]
filter: 9 [ForkJoinPool.commonPool-worker-2]
map: 0 [ForkJoinPool.commonPool-worker-3]
map: 2 [ForkJoinPool.commonPool-worker-1]
map: 8 [main]`
As we can see each sequence of tasks for each long is executed by exactly one and the same thread. Is it something we can rely on, or it is just a coincidence? Can threads 'share' tasks during execution?
From stream package summary section on Side-effects:
If the behavioral parameters do have side-effects, unless explicitly stated, there are no guarantees as to the visibility of those side-effects to other threads, nor are there any guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread.
It's not the coincidence, it's how Stream API is currently implemented in OracleJDK/OpenJDK: stateless operations (like filter
, map
, peek
and flatMap
) are fused together into single operation which performs the steps sequentially in single thread. However introducing some stateful operation might change the things. For example, let's add a limit
:
LongStream.range(0, 10).parallel()
.filter(l -> {
System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName());
return l % 2 == 0;
})
.limit(10)
.map(l -> {
System.out.format("map: %s [%s]\n", l, Thread.currentThread().getName());
return l;
})
.forEach(x -> {});
Now limit introduces a barrier which splits the pipeline into two parts. The result is like this:
filter: 8 [ForkJoinPool.commonPool-worker-2]
filter: 9 [ForkJoinPool.commonPool-worker-7]
filter: 0 [ForkJoinPool.commonPool-worker-6]
filter: 1 [ForkJoinPool.commonPool-worker-3]
filter: 4 [ForkJoinPool.commonPool-worker-5]
filter: 2 [ForkJoinPool.commonPool-worker-1]
filter: 6 [main]
filter: 7 [ForkJoinPool.commonPool-worker-4]
filter: 3 [ForkJoinPool.commonPool-worker-6]
filter: 5 [ForkJoinPool.commonPool-worker-2]
map: 0 [ForkJoinPool.commonPool-worker-6]
map: 2 [ForkJoinPool.commonPool-worker-2]
map: 8 [ForkJoinPool.commonPool-worker-4]
map: 6 [main]
map: 4 [ForkJoinPool.commonPool-worker-6]
See that element#2 was filtered in FJP-1 thread, but mapped in FJP-2 thread.
Note that as @Misha correctly cited, even for stateless operations there's no guarantee that the same thread will be used. It's possible that future or alternative Stream API implementations will change this behavior (for example, using producer-consumer approach).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With