I would like to globally replace the common thread pool used by default by the Java parallel streams, i.e., for example for
IntStream.range(0,100).parallel().forEach(i -> {
doWork();
});
I know that it is possible to use a dedicated ForkJoinPool by submitting such instruction to a dedicated thread pool (see Custom thread pool in Java 8 parallel stream ). The question here is
Executors.newFixedThreadPool(10)
?Remark: The reason why I like to replace the F/J pool is, because it appears to have a bug which makes it unusable for nested parallel loops.
Nested parallel loops have poor performance and may lead to deadlocks, see http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html
For example: The following code leads to a deadlock:
// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {
// (omitted:) do some heavy work here (consuming majority of time)
// Need to synchronize for a small "subtask" (e.g. updating a result)
synchronized(this) {
// Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
IntStream.range(0,100).parallel().forEach(j -> {
// do work here
});
}
});
(even without any additional code at "do work here", given that parallelism is set to < 12).
My question is how to replace the FJP. If you like to discuss nested parallel loops, you might check Nested Java 8 parallel forEach loop perform poor. Is this behavior expected? .
I think that's not the way the stream API is intended to be used. It seems you're (mis)using it for simply doing parallel task execution (focusing on the task, not the data), instead of doing parallel stream processing (focusing on the data in the stream). Your code somehow violates some of the main principles for streams. (I'm writing 'somehow' as it is not really forbidden but discouraged): Avoid states and side effects.
Apart from that (or maybe because of side effects), you're using heavy synchronization within your outer loop, which is everything else but harmless!
Although not mentioned in the documentation, parallel streams use the common ForkJoinPool
internally. No matter whether or not this is a lack of documentation, we must simply accept that fact. The JavaDoc of ForkJoinTask
states:
It is possible to define and use ForkJoinTasks that may block, but doing do requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool.ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool.getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.
Again, it seems that you're using streams as replacement for a simple for-loop and an executor service.
n
tasks in parallel, use an ExecutionService
ForkJoinPool
(with ForkJoinTasks
) instead. (It ensures a constant number of threads without the danger of a deadlock because of too many tasks waiting for others to complete, as waiting tasks do not block their executing threads).Don't mix up ExecutionService
and ForkJoinPool
. They are (usually) not a replacement for each other!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With