The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.
The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.
ForkJoinPool It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance. Worker threads can execute only one task at a time, but the ForkJoinPool doesn't create a separate thread for every single subtask.
The fork/join framework was designed to speed up the execution of tasks that can be divided into other smaller subtasks, executing them in parallel and then combining their results to get a single one.
I think the basic misunderstanding is, that the Fork/Join examples do NOT show work stealing but only some kind of standard divide and conquer.
Work stealing would be like this: Worker B has finished his work. He is a kind one, so he looks around and sees Worker A still working very hard. He strolls over and asks: "Hey lad, I could give you a hand." A replies. "Cool, I have this task of 1000 units. So far I have finished 345 leaving 655. Could you please work on number 673 to 1000, I'll do the 346 to 672." B says "OK, let's start so we can go to the pub earlier."
You see - the workers must communicate between each other even when they started the real work. This is the missing part in the examples.
The examples on the other hand show only something like "use subcontractors":
Worker A: "Dang, I have 1000 units of work. Too much for me. I'll do 500 myself and subcontract 500 to someone else." This goes on until the big task is broken down into small packets of 10 units each. These will be executed by the available workers. But if one packet is a kind of poison pill and takes considerably longer than other packets -- bad luck, the divide phase is over.
The only remaining difference between Fork/Join and splitting the task upfront is this: When splitting upfront you have the work queue full right from start. Example: 1000 units, the threshold is 10, so the queue has 100 entries. These packets are distributed to the threadpool members.
Fork/Join is more complex and tries to keep the number of packets in the queue smaller:
You see: in Fork/Join the queue is smaller (6 in the example) and the "split" and "work" phases are interleaved.
When multiple workers are popping and pushing simultaneously the interactions are not so clear of course.
If you have n busy threads all working away at 100% independently, that's going to be better than n threads in a Fork-Join (FJ) pool. But it never works out that way.
There might not be able to precisely split the problem into n equal pieces. Even if you do, thread scheduling is some way off being fair. You'll end up waiting for the slowest thread. If you have multiple task then they can each run with less than n-way parallelism (generally more efficient), yet go up to n-way when other tasks have finished.
So why don't we just cut the problem up into FJ-size pieces and have a thread pool work on that. Typical FJ usage cuts the problem into tiny pieces. Doing these in a random order requires much co-ordination at a hardware level. The overheads would be a killer. In FJ, tasks are put onto a queue that the thread reads off in Last In First Out order (LIFO/stack), and work stealing (in core work, generally) is done First In First Out (FIFO/"queue"). The result is that long array processing can be done largely sequentially, even though it is broken into tiny chunks. (It is also the case that it might not be trivial to break the problem up into small evenly sized chunks in one big bang. Say dealing with a some form of hierarchy without balancing.)
Conclusion: FJ allows more efficient use of hardware threads in an uneven situations, which will be always if you have more than one thread.
The ultimate goal of thread pools and Fork/Join are alike: Both want to utilize the available CPU power the best they can for maximum throughput. Maximum throughput means that as many tasks as possible should be completed in a long period of time. What is needed to do that? (For the following we will assume that there is no shortage of calculation tasks: There is always enough to do for 100% CPU utilisation. Additionally I use "CPU" equivalently for cores or virtual cores in case of hyper-threading).
Thus we figured out that for maximum throughput we need to have the exact same number of threads than CPUs. In Oracle's blurring example you can both take a fixed size thread pool with the number of threads equal to the number of available CPUs or use a thread pool. It won't make a difference, you are right!
So when will you get into trouble with a thread pools? That is if a thread blocks, because your thread is waiting for another task to complete. Assume the following example:
class AbcAlgorithm implements Runnable {
public void run() {
Future<StepAResult> aFuture = threadPool.submit(new ATask());
StepBResult bResult = stepB();
StepAResult aResult = aFuture.get();
stepC(aResult, bResult);
}
}
What we see here is an algorithm that consists of three steps A, B and C. A and B can be performed independently of each other, but step C needs the result of step A AND B. What this algorithm does is submit task A to the threadpool and perform task b directly. After that the thread will wait for task A to be done as well and continue with step C. If A and B are completed at the same time, then everything is fine. But what if A takes longer than B? That may be because the nature of task A dictates it, but it may also be the case because there is not thread for task A available in the beginning and task A needs to wait. (If there is only a single CPU available and thus your threadpool has only a single thread this will even cause a deadlock, but for now that is besides the point). The point is that the thread that just executed task B blocks the whole thread. Since we have the same number of threads as CPUs and one thread is blocked that means that one CPU is idle.
Fork/Join solves this problem: In the fork/join framework you'd write the same algorithm as follows:
class AbcAlgorithm implements Runnable {
public void run() {
ATask aTask = new ATask());
aTask.fork();
StepBResult bResult = stepB();
StepAResult aResult = aTask.join();
stepC(aResult, bResult);
}
}
Looks the same, does it not? However the clue is that aTask.join
will not block. Instead here is where work-stealing comes into play: The thread will look around for other tasks that have been forked in the past and will continue with those. First it checks whether the tasks it has forked itself have started processing. So if A has not been started by another thread yet, it will do A next, otherwise it will check the queue of other threads and steal their work. Once this other task of another thread has completed it will check whether A is completed now. If it is the above algorithm can call stepC
. Otherwise it will look for yet another task to steal. Thus fork/join pools can achieve 100% CPU utilisation, even in the face of blocking actions.
However there is a trap: Work-stealing is only possible for the join
call of ForkJoinTask
s. It cannot be done for external blocking actions like waiting for another thread or waiting for an I/O action. So what about that, waiting for I/O to complete is a common task? In this case if we could add an additional thread to Fork/Join pool that will be stopped again as soon as the blocking action has completed will be the second best thing to do. And the ForkJoinPool
can actually do just that if we are using ManagedBlocker
s.
In the JavaDoc for RecursiveTask is an example for calculating Fibonacci numbers using Fork/Join. For a classic recursive solution see:
public static int fib(int n) {
if (n <= 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}
As is explained int the JavaDocs this is a pretty dump way to calculate fibonacci numbers, as this algorithm has O(2^n) complexity while simpler ways are possible. However this algorithm is very simple and easy to understand, so we stick with it. Let's assume we want to speed this up with Fork/Join. A naive implementation would look like this:
class Fibonacci extends RecursiveTask<Long> {
private final long n;
Fibonacci(long n) {
this.n = n;
}
public Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
The steps that this Task is split into are way too short and thus this will perform horribly, but you can see how the framework generally works very well: The two summands can be calculated independently, but then we need both of them to build the final result. So one half is done in an other thread. Have fun doing the same with thread pools without getting a deadlock (possible, but not nearly as simple).
Just for completeness: If you'd actually want to calculate Fibonacci numbers using this recursive approach here is an optimized version:
class FibonacciBigSubtasks extends RecursiveTask<Long> {
private final long n;
FibonacciBigSubtasks(long n) {
this.n = n;
}
public Long compute() {
return fib(n);
}
private long fib(long n) {
if (n <= 1) {
return 1;
}
if (n > 10 && getSurplusQueuedTaskCount() < 2) {
final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
f1.fork();
return f2.compute() + f1.join();
} else {
return fib(n - 1) + fib(n - 2);
}
}
}
This keeps the subtasks much smaller because they are only split when n > 10 && getSurplusQueuedTaskCount() < 2
is true, which means that there are significantly more than 100 method calls to do (n > 10
) and there are not very man tasks already waiting (getSurplusQueuedTaskCount() < 2
).
On my computer (4 core (8 when counting Hyper-threading), Intel(R) Core(TM) i7-2720QM CPU @ 2.20GHz) the fib(50)
takes 64 seconds with the classic approach and just 18 seconds with the Fork/Join approach which is quite a noticeable gain, although not as much as theoretically possible.
Fork/join is different from a thread pool because it implements work stealing. From Fork/Join
As with any ExecutorService, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
Say you have two threads, and 4 tasks a, b, c, d which take 1, 1, 5 and 6 seconds respectively. Initially, a and b are assigned to thread 1 and c and d to thread 2. In a thread pool, this would take 11 seconds. With fork/join, thread 1 finishes and can steal work from thread 2, so task d would end up being executed by thread 1. Thread 1 executes a, b and d, thread 2 just c. Overall time: 8 seconds, not 11.
EDIT: As Joonas points out, tasks are not necessarily pre-allocated to a thread. The idea of fork/join is that a thread can choose to split a task into multiple sub-pieces. So to restate the above:
We have two tasks (ab) and (cd) which take 2 and 11 seconds respectively. Thread 1 starts to execute ab and split it into two sub-tasks a & b. Similarly with thread 2, it splits into two sub-tasks c & d. When thread 1 has finished a & b, it can steal d from thread 2.
Everyone above is correct the benefits are achieved by the work stealing, but to expand on why this is.
The primary benefit is the efficient coordination between worker threads. The work has to be split up and reassembled, which requires coordination. As you can see in A.H's answer above each thread has its own work list. An important property of this list is that it is sorted (large tasks at the top and small tasks at the bottom). Each thread executes the tasks at the bottom of its list and steals tasks from the top of other threads lists.
The result of this is:
Most other divide and conquer schemes using thread pools require more inter-thread communication and coordination.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With