So I know that if you use the parallelStream
without a custom ForkJoinPool it will use the default ForkJoinPool which by default has one less threads as you have processors.
So, as stated here (and also in the other answer of that question) in order to have more parallelism, you have to:
submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(doSomething));
So, I did this:
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(1000);
IntStream stream = IntStream.range(0, 999999);
final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());
forkJoinPool.submit(() -> {
stream.parallel().forEach(n -> {
System.out.println("Processing n: " + n);
try {
Thread.sleep(500);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}).get();
}
}
I made a Set of thread Names in order to see how many threads are being created, and also logged the number of active threads that the pool has and both numbers don't grow up more that 16, so that means that the parallelism here is not being more than 16 (why even 16?). If I do not use the forkJoinPool, I get 4 as parallelism, which is according to the number of processors I have.
Why does it give me 16 and not 1000?
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.
To arrange the same value as is used by default for the common pool, use 256 plus the parallelism level. (By default, the common pool allows a maximum of 256 spare threads.) Using a value (for example Integer.
A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others.
Parallel streams enable us to execute code in parallel on separate cores. The final result is the combination of each individual outcome.
Update
Originally this answer was an elaborate explanation claiming that the ForkJoinPool
applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.
That's incorrect.
The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool
for stream processing is not officially supported, and when using forEach
, the default pool parallelism is used to determine the stream spliterator behavior.
Here's an example how when tasks are manually submitted to a custom ForkJoinPool
, the pool's active thread count easily reaches its parallelism level:
for (int i = 0; i < 1_000_000; ++i) {
forkJoinPool.submit(() -> {
try {
Thread.sleep(1);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With