Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does ForkJoinPool::invoke() block the main thread?

Disclaimer: It's the first time I'm using Java's Fork-Join framework, so I'm not 100% sure I'm using it correctly. Java is also not my main programming language, so this could also be relevant.


Given the following SSCCE:

import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

class ForkCalculator extends RecursiveAction
{
    private final Integer[] delayTasks;

    public ForkCalculator(Integer[] delayTasks)
    {
        this.delayTasks = delayTasks;
    }

    @Override
    protected void compute()
    {
        if (this.delayTasks.length == 1) {
            this.computeDirectly();
            return;
        }

        Integer halfway = this.delayTasks.length / 2;

        ForkJoinTask.invokeAll(
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, 0, halfway)
            ),
            new ForkCalculator(
                Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
            )
        );
    }

    private void computeDirectly()
    {
        Integer delayTask = this.delayTasks[0];

        try {
            Thread.sleep(delayTask);
        } catch (InterruptedException ex) {
            System.err.println(ex.getMessage());
            System.exit(2);
        }

        System.out.println("Finished computing task with delay " + delayTask);
    }
}

public final class ForkJoinBlocker
{
    public static void main(String[] args)
    {
        ForkCalculator calculator = new ForkCalculator(
            new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
        );

        ForkJoinPool pool = new ForkJoinPool(
            Runtime.getRuntime().availableProcessors()
        );

        pool.invoke(calculator);

        //make it a daemon thread
        Timer timer = new Timer(true);

        timer.scheduleAtFixedRate(
            new TimerTask() {
                @Override
                public void run()
                {
                    System.out.println(pool.toString());
                }
            },
            100,
            2000
        );
    }
}

So I create a ForkJoinPool to which I submit some tasks which do some processing. I replaced them with Thread.sleep() for the purposes of this example, to keep it simple.

In my actual program, this is a very long list of tasks, so I want to periodically print the current status on the standard-output. I try to do that on a separate thread using a scheduled TimerTask.

However, I noticed something that I wasn't expecting: in my example the output is something like:

Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......

Which means the "status-task" is never executed.

However if I modify my code to move the pool.invoke(calculator); at the very end, then it works as expected:

java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......

The only conclusion I can draw is that ForkJoinPool::invoke() blocks the main-thread (it only returns AFTER all the tasks in the pool are finished).

I expected the code in the main-thread to continue to be executed, while the tasks in the fork-join-pool are handled asynchronously.

My question is: does this happen because I used the framework incorrectly? Is there something I have to correct in my code?

I noticed one of ForkJoinPools constructors has a boolean asyncMode parameter but, from what I can tell from the implementation, this is just to decide between FIFO_QUEUE and LIFO_QUEUE execution-modes (not exactly sure what those are):

public ForkJoinPool(
    int parallelism,
    ForkJoinWorkerThreadFactory factory,
    UncaughtExceptionHandler handler,
    boolean asyncMode
) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
like image 261
Radu Murzea Avatar asked Oct 01 '18 13:10

Radu Murzea


People also ask

How does ForkJoinPool work in Java?

ForkJoinPool acts recursively, unlike Executor threads, which splits the task and submits smaller chunks to worker Threads. ForkJoinPool takes a big task, splits it into smaller tasks, and those smaller tasks split themselves again into subtasks until each subtask is atomic or not divisible. So it works recursively.

How many threads does ForkJoinPool use?

Its implementation restricts the maximum number of running threads to 32767 and attempting to create pools with greater than this size will result to IllegalArgumentException .

What is difference between ExecutorService and ForkJoinPool?

The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.

What is ForkJoinPool commonPool ()?

ForkJoinPool#commonPool() is a static thread-pool, which is lazily initialized when is actually needed. Two major concepts use the commonPool inside JDK: CompletableFuture and Parallel Streams .


1 Answers

Basically invoke() will wait for the entire task to finish before returning, so yes the main thread is blocking. After that, the Timer doesn't have time to execute because it runs on a daemon thread.

You can simply use execute() instead of invoke() which runs the task asynchronously. Then you can join() on the ForkJoinTask to wait for the result, during which the Timer would be running:

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);

    //make it a daemon thread
Timer timer = new Timer(true);

timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            System.out.println(pool.toString());
        }
    }, 100, 2000);

calculator.join(); // wait for computation
like image 97
M A Avatar answered Oct 11 '22 14:10

M A