Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do Java 8 parallel streams use the same thread for a sequence

Lets say we have something like this:

LongStream.range(0, 10).parallel()
.filter(l -> {
  System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName());
  return l % 2 == 0;
})
.map(l -> {
  System.out.format("map:    %s [%s]\n", l, Thread.currentThread().getName());
  return l;
});

If you run this program output would be something like:

filter: 6 [main]
map:    6 [main]
filter: 5 [main]
filter: 4 [ForkJoinPool.commonPool-worker-2]
map:    4 [ForkJoinPool.commonPool-worker-2]
filter: 1 [ForkJoinPool.commonPool-worker-3]
filter: 2 [ForkJoinPool.commonPool-worker-1]
filter: 0 [ForkJoinPool.commonPool-worker-3]
filter: 3 [ForkJoinPool.commonPool-worker-2]
filter: 8 [main]
filter: 7 [ForkJoinPool.commonPool-worker-2]
filter: 9 [ForkJoinPool.commonPool-worker-2]
map:    0 [ForkJoinPool.commonPool-worker-3]
map:    2 [ForkJoinPool.commonPool-worker-1]
map:    8 [main]`

As we can see each sequence of tasks for each long is executed by exactly one and the same thread. Is it something we can rely on, or it is just a coincidence? Can threads 'share' tasks during execution?

like image 350
Konstantin Kulagin Avatar asked Apr 13 '16 20:04

Konstantin Kulagin


Video Answer


2 Answers

From stream package summary section on Side-effects:

If the behavioral parameters do have side-effects, unless explicitly stated, there are no guarantees as to the visibility of those side-effects to other threads, nor are there any guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread.

like image 171
Misha Avatar answered Oct 21 '22 18:10

Misha


It's not the coincidence, it's how Stream API is currently implemented in OracleJDK/OpenJDK: stateless operations (like filter, map, peek and flatMap) are fused together into single operation which performs the steps sequentially in single thread. However introducing some stateful operation might change the things. For example, let's add a limit:

LongStream.range(0, 10).parallel()
.filter(l -> {
  System.out.format("filter: %s [%s]\n", l, Thread.currentThread().getName());
  return l % 2 == 0;
})
.limit(10)
.map(l -> {
  System.out.format("map:    %s [%s]\n", l, Thread.currentThread().getName());
  return l;
})
.forEach(x -> {});

Now limit introduces a barrier which splits the pipeline into two parts. The result is like this:

filter: 8 [ForkJoinPool.commonPool-worker-2]
filter: 9 [ForkJoinPool.commonPool-worker-7]
filter: 0 [ForkJoinPool.commonPool-worker-6]
filter: 1 [ForkJoinPool.commonPool-worker-3]
filter: 4 [ForkJoinPool.commonPool-worker-5]
filter: 2 [ForkJoinPool.commonPool-worker-1]
filter: 6 [main]
filter: 7 [ForkJoinPool.commonPool-worker-4]
filter: 3 [ForkJoinPool.commonPool-worker-6]
filter: 5 [ForkJoinPool.commonPool-worker-2]
map:    0 [ForkJoinPool.commonPool-worker-6]
map:    2 [ForkJoinPool.commonPool-worker-2]
map:    8 [ForkJoinPool.commonPool-worker-4]
map:    6 [main]
map:    4 [ForkJoinPool.commonPool-worker-6]

See that element#2 was filtered in FJP-1 thread, but mapped in FJP-2 thread.

Note that as @Misha correctly cited, even for stateless operations there's no guarantee that the same thread will be used. It's possible that future or alternative Stream API implementations will change this behavior (for example, using producer-consumer approach).

like image 34
Tagir Valeev Avatar answered Oct 21 '22 19:10

Tagir Valeev