Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ForkJoinPool.invoke() and ForkJoinTask.invoke() or compute()

I was reading Java ForkJoin framework. What extra benefits are there by not directly call invoke() on an implementation of ForkJoinTask (e.g. RecursiveTask), but to instantiate ForkJoinPool and call pool.invoke(task)? What exactly happens when we call these 2 methods all called invoke?

From the source, it seems that if recursiveTask.invoke is called, it will invoke its exec and eventually compute, in a managed thread pool manner. As such it's even more confusing why we have the idiom pool.invoke(task).

I wrote some simple code to test for performance difference, but I didn't see any. Maybe the test code is wrong? See below:

public class MyForkJoinTask extends RecursiveAction {

    private static int totalWorkInMillis = 20000;
    protected static int sThreshold = 1000;

    private int workInMillis;


    public MyForkJoinTask(int work) {
        this.workInMillis = work;
    }

    // Average pixels from source, write results into destination.
    protected void computeDirectly() {
        try {

            ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>();
            Thread.sleep(workInMillis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void compute() {
        if (workInMillis < sThreshold) {
            computeDirectly();
            return;
        }

        int discountedWork = (int) (workInMillis * 0.9);
        int split = discountedWork / 2;

        invokeAll(new MyForkJoinTask(split),
                new MyForkJoinTask(split));
    }

    public static void main(String[] args) throws Exception {
        System.out.printf("Total work is %d in millis.%n", totalWorkInMillis);
        System.out.printf("Threshold is %d in millis.%n", sThreshold);

        int processors = Runtime.getRuntime().availableProcessors();
        System.out.println(Integer.toString(processors) + " processor"
                + (processors != 1 ? "s are " : " is ")
                + "available");

        MyForkJoinTask fb = new MyForkJoinTask(totalWorkInMillis);

        ForkJoinPool pool = new ForkJoinPool();

        long startTime = System.currentTimeMillis();


        // These 2 seems no difference!
        pool.invoke(fb);
//        fb.compute();


        long endTime = System.currentTimeMillis();

        System.out.println("Took " + (endTime - startTime) +
                " milliseconds.");
    }
}
like image 671
Boyang Avatar asked Dec 07 '15 11:12

Boyang


1 Answers

The compute() method of the RecursiveTask class is just an abstract method that contains the task code. It doesn't use a new thread from the pool, and if you call it normally, it's not run in a pool managed thread.

The invoke method on the fork join pool submits a task to the pool, which then starts running on a separate thread, calls the compute method on that thread, and then waits for a result.

You can see this in the wording in the java doc for RecursiveTask and ForkJoinPool. The invoke() method actually performs the task, whereas the compute() method just encapsulates the computation.

protected abstract V compute()

The main computation performed by this task.

And ForkJoinPool

public <T> T invoke(ForkJoinTask<T> task)

Performs the given task, returning its result upon completion. ...

So with the compute method, what you're doing is running the first call to compute outside of the fork join pool. You can test this by adding the log line inside the compute method.

System.out.println(this.inForkJoinPool());

You can also check that it's running in the same thread by logging the thread id

System.out.println(Thread.currentThread().getId());

Once you call invokeAll, the subtasks included in that call are then run in a pool. Note though, that it is NOT necessarily run in the pool you created just before calling compute(). You can comment out your new ForkJoinPool() code, and it will still run. Interestingly, the java 7 doc says that the invokeAll() method will throw an exception if it's called outside of a pool managed thread, but the java 8 doc doesn't. I haven't tested it in java 7 mind you (only 8). But quite possibly, your code would throw an exception when calling compute() directly in java 7.

The reason both results are returning the same time is that milliseconds aren't quite accurate enough to record the difference of starting the first thread in a pool managed thread, or just running the first compute call in an existing thread.

The way the OCA/OCP study guide by Sierra and Bates recommend you use the fork join framework is to call invoke() from the pool. It makes it clear which pool you are using, and it also means you can submit multiple tasks to the same pool, which saves the overhead of recreating new pools each time. Logically, it's also cleaner to keep all of your task computation within a pool managed thread (or at least I think it is).

pool.invoke() calls invoke on a particular pool; rather than leaving it up to the framework to create one, when task.invoke or task.invokeAll is called for the first time. It means you can re-use the pool for new tasks, and specify things like number of active threads when creating the pool. That's the difference. Add those log lines to your code, play around with it, and you'll see what it's doing it

like image 165
AndyN Avatar answered Nov 03 '22 15:11

AndyN