Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to (globally) replace the common thread pool backend of Java parallel streams?

I would like to globally replace the common thread pool used by default by the Java parallel streams, i.e., for example for

IntStream.range(0,100).parallel().forEach(i -> {
    doWork();
});

I know that it is possible to use a dedicated ForkJoinPool by submitting such instruction to a dedicated thread pool (see Custom thread pool in Java 8 parallel stream ). The question here is

  • Is it possible to replace the common ForkJoinPool by some other implementation (say a Executors.newFixedThreadPool(10)?
  • Is it possible to do so by some global setting, e.g., some JVM property?

Remark: The reason why I like to replace the F/J pool is, because it appears to have a bug which makes it unusable for nested parallel loops.

Nested parallel loops have poor performance and may lead to deadlocks, see http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

For example: The following code leads to a deadlock:

// Outer loop
IntStream.range(0,24).parallel().forEach(i -> {

    // (omitted:) do some heavy work here (consuming majority of time)

    // Need to synchronize for a small "subtask" (e.g. updating a result)
    synchronized(this) {
        // Inner loop (does s.th. completely free of side-effects, i.e. expected to work)
        IntStream.range(0,100).parallel().forEach(j -> {
            // do work here
        });
    }
});

(even without any additional code at "do work here", given that parallelism is set to < 12).

My question is how to replace the FJP. If you like to discuss nested parallel loops, you might check Nested Java 8 parallel forEach loop perform poor. Is this behavior expected? .

like image 571
Christian Fries Avatar asked May 09 '14 11:05

Christian Fries


1 Answers

I think that's not the way the stream API is intended to be used. It seems you're (mis)using it for simply doing parallel task execution (focusing on the task, not the data), instead of doing parallel stream processing (focusing on the data in the stream). Your code somehow violates some of the main principles for streams. (I'm writing 'somehow' as it is not really forbidden but discouraged): Avoid states and side effects.

Apart from that (or maybe because of side effects), you're using heavy synchronization within your outer loop, which is everything else but harmless!

Although not mentioned in the documentation, parallel streams use the common ForkJoinPool internally. No matter whether or not this is a lack of documentation, we must simply accept that fact. The JavaDoc of ForkJoinTask states:

It is possible to define and use ForkJoinTasks that may block, but doing do requires three further considerations: (1) Completion of few if any other tasks should be dependent on a task that blocks on external synchronization or I/O. Event-style async tasks that are never joined (for example, those subclassing CountedCompleter) often fall into this category. (2) To minimize resource impact, tasks should be small; ideally performing only the (possibly) blocking action. (3) Unless the ForkJoinPool.ManagedBlocker API is used, or the number of possibly blocked tasks is known to be less than the pool's ForkJoinPool.getParallelism level, the pool cannot guarantee that enough threads will be available to ensure progress or good performance.

Again, it seems that you're using streams as replacement for a simple for-loop and an executor service.

  • If you just want to execute n tasks in parallel, use an ExecutionService
  • If you have a more complex example where tasks are creating subtasks, consider using a ForkJoinPool (with ForkJoinTasks) instead. (It ensures a constant number of threads without the danger of a deadlock because of too many tasks waiting for others to complete, as waiting tasks do not block their executing threads).
  • If you want to process data (in parallel), consider using the stream API.
  • You cannot 'install' a custom common pool. It's created internally in private static code.
  • But you can take influence on the parallelism, the thread factory and the exception handler of the common pool using certain system properties (see JavaDoc of ForkJoinPool)

Don't mix up ExecutionService and ForkJoinPool. They are (usually) not a replacement for each other!

like image 110
isnot2bad Avatar answered Oct 01 '22 13:10

isnot2bad