Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does the parallel stream not use all the threads of the ForkJoinPool? [duplicate]

So I know that if you use the parallelStream without a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.

So, as stated here (and also in the other answer of that question) in order to have more parallelism, you have to:

submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));

So, I did this:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.

Why does it give me 16 and not 1000?

like image 238
Pablo Matias Gomez Avatar asked Apr 29 '16 20:04

Pablo Matias Gomez


People also ask

How many threads does parallel stream use?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

How many threads does ForkJoinPool Commonpool have?

To arrange the same value as is used by default for the common pool, use 256 plus the parallelism level. (By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer.

What is parallelism ForkJoinPool?

A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others.

What does stream parallel do?

Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.


1 Answers

Update

Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

That's incorrect.

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)

like image 73
Dimitar Dimitrov Avatar answered Sep 20 '22 15:09

Dimitar Dimitrov