Disclaimer: It's the first time I'm using Java's Fork-Join framework, so I'm not 100% sure I'm using it correctly. Java is also not my main programming language, so this could also be relevant.
Given the following SSCCE:
import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
class ForkCalculator extends RecursiveAction
{
private final Integer[] delayTasks;
public ForkCalculator(Integer[] delayTasks)
{
this.delayTasks = delayTasks;
}
@Override
protected void compute()
{
if (this.delayTasks.length == 1) {
this.computeDirectly();
return;
}
Integer halfway = this.delayTasks.length / 2;
ForkJoinTask.invokeAll(
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, 0, halfway)
),
new ForkCalculator(
Arrays.copyOfRange(this.delayTasks, halfway, this.delayTasks.length)
)
);
}
private void computeDirectly()
{
Integer delayTask = this.delayTasks[0];
try {
Thread.sleep(delayTask);
} catch (InterruptedException ex) {
System.err.println(ex.getMessage());
System.exit(2);
}
System.out.println("Finished computing task with delay " + delayTask);
}
}
public final class ForkJoinBlocker
{
public static void main(String[] args)
{
ForkCalculator calculator = new ForkCalculator(
new Integer[]{1500, 1400, 1950, 2399, 4670, 880, 5540, 1975, 3010, 4180, 2290, 1940, 510}
);
ForkJoinPool pool = new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
);
pool.invoke(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(
new TimerTask() {
@Override
public void run()
{
System.out.println(pool.toString());
}
},
100,
2000
);
}
}
So I create a ForkJoinPool
to which I submit some tasks which do some processing. I replaced them with Thread.sleep()
for the purposes of this example, to keep it simple.
In my actual program, this is a very long list of tasks, so I want to periodically print the current status on the standard-output. I try to do that on a separate thread using a scheduled TimerTask
.
However, I noticed something that I wasn't expecting: in my example the output is something like:
Finished computing task with delay 1500
Finished computing task with delay 2399
Finished computing task with delay 1400
Finished computing task with delay 4180
Finished computing task with delay 1950
Finished computing task with delay 5540
Finished computing task with delay 880
.......
Which means the "status-task" is never executed.
However if I modify my code to move the pool.invoke(calculator);
at the very end, then it works as expected:
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 1500
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 5, submissions = 0]
Finished computing task with delay 2399
Finished computing task with delay 1400
java.util.concurrent.ForkJoinPool@59bf63ba[Running, parallelism = 4, size = 4, active = 4, running = 0, steals = 0, tasks = 4, submissions = 0]
Finished computing task with delay 4180
Finished computing task with delay 1950
......
The only conclusion I can draw is that ForkJoinPool::invoke()
blocks the main-thread (it only returns AFTER all the tasks in the pool are finished).
I expected the code in the main-thread to continue to be executed, while the tasks in the fork-join-pool are handled asynchronously.
My question is: does this happen because I used the framework incorrectly? Is there something I have to correct in my code?
I noticed one of ForkJoinPool
s constructors has a boolean asyncMode
parameter but, from what I can tell from the implementation, this is just to decide between FIFO_QUEUE
and LIFO_QUEUE
execution-modes (not exactly sure what those are):
public ForkJoinPool(
int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode
) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
ForkJoinPool acts recursively, unlike Executor threads, which splits the task and submits smaller chunks to worker Threads. ForkJoinPool takes a big task, splits it into smaller tasks, and those smaller tasks split themselves again into subtasks until each subtask is atomic or not divisible. So it works recursively.
Its implementation restricts the maximum number of running threads to 32767 and attempting to create pools with greater than this size will result to IllegalArgumentException .
The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.
ForkJoinPool#commonPool() is a static thread-pool, which is lazily initialized when is actually needed. Two major concepts use the commonPool inside JDK: CompletableFuture and Parallel Streams .
Basically invoke()
will wait for the entire task to finish before returning, so yes the main thread is blocking. After that, the Timer
doesn't have time to execute because it runs on a daemon thread.
You can simply use execute()
instead of invoke()
which runs the task asynchronously. Then you can join()
on the ForkJoinTask
to wait for the result, during which the Timer
would be running:
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
pool.execute(calculator);
//make it a daemon thread
Timer timer = new Timer(true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println(pool.toString());
}
}, 100, 2000);
calculator.join(); // wait for computation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With