Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In a Java 7+ ForkJoinPool, is it possible to cancel a task and all subtasks?

My program searches for a solution (any solution) to a problem through a divide-and-conquer approach, implemented using recursion and RecursiveTasks's: I fork a task for the first branch of the division, then recurse into the second branch: if the second branch has found a solution, then I cancel the first branch, otherwise I wait for its result.

This is perhaps not optimal. One approach would be for any of the launched tasks to throw an exception if a solution is found. But then, how would I cancel all the launched tasks? Does cancelling a task also cancel all sub-tasks?

like image 852
David Monniaux Avatar asked May 26 '14 11:05

David Monniaux


People also ask

How does ForkJoinPool work in Java?

ForkJoinPool acts recursively, unlike Executor threads, which splits the task and submits smaller chunks to worker Threads. ForkJoinPool takes a big task, splits it into smaller tasks, and those smaller tasks split themselves again into subtasks until each subtask is atomic or not divisible. So it works recursively.

What is difference between ExecutorService and ForkJoinPool?

The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.

Which of the following method returns true if the task was either Cancelled or encountered an exception?

isCancelled. Returns true if this task was cancelled before it completed normally.


2 Answers

You can use the simple approach with task manager. For example:

public class TaskManager<T> {

private List<ForkJoinTask<T>> tasks;

public TaskManager() {
    tasks = new ArrayList<>();
}

public void addTask(ForkJoinTask<T> task) {
    tasks.add(task);
}

public void cancelAllExcludeTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task != cancelTask) {
            task.cancel(true);
        }
    }
}

public void cancelTask(ForkJoinTask<Integer> cancelTask) {
    for (ForkJoinTask<T> task : tasks) {
        if (task == cancelTask) {
            task.cancel(true);
        }
    }
}

}

And the task:

public class YourTask extends RecursiveTask<Integer> {

private TaskManager<Integer> taskManager;

@Override
protected Integer compute() {
        // stuff and fork
        newTask.fork();
        // do not forget to save in managers list
        taskManager.addTask(newTask);

        // another logic

        // if current task should be cancelled            
        taskManager.cancelTasks(this);

        // or if you have decided to cancel all other tasks
        taskManager.cancelAllExcludeTask(this);
}
}
like image 153
Vladimir Kishlaly Avatar answered Oct 03 '22 19:10

Vladimir Kishlaly


The framework cannot cancel a task for the same reason you cannot cancel a thread. See the documentation on Thread.stop() for all the reasons. What locks could the task be holding? What outside resources could it have linkage to? All the same Thread.stop() reasons apply to tasks as well (after all, tasks run under threads.) You need to tell the task to stop just like you tell a thread to stop.

I manage another fork/join project that uses the scatter-gather technique. The way I do a cancel, or short-circuit, is that every task I create is passed an object (PassObject) that has a

protected volatile boolean stop_now = false;

and a method for stopping the task

protected void stopNow() {stop_now = true; }

Each task periodically checks the stop_now and when true it gracefully ends the task.

Unfortunately, the stop_now needs to be volatile since another thread is going to set it. This can add significant overhead if you check it frequently.

How to set this field in another task gets a little tricky. Each task I create also contains a reference to the array of references to every other task

int nbr_tasks = nbr_elements / threshold;
// this holds the common class passed to each task
PassObject[] passList = new PassObject[nbr_tasks];
 for (int i = 0; i < nbr_tasks; i++) 
passList[i] = new PassObject( passList,… other parms);

Once the list is formed I fork() each object in passList. Each PassObject contains a reference to the array, passList, which contains a reference to every object that is passed to each task. Therefore, every task knows about every other task and when one task want to cancel the others it simply calls the cancelOthers method with a reference to the passList.

private void cancelOthers (PassObject[] others) {
// tell all tasks to stop
 for (int i = 0, max = others.length; i < max; i++)
others[i].stopNow();

If you’re using Java8 then you can do a form of scatter-gather with the CountedCompler class instead of the RecusiveTask. For Java7 or if you still want to use RecursiveTask, then the first task in the recursion needs to create an AtomicBoolean field (AtomicBoolean stop_now = new AtomicBoolean(false);) and include a reference to this field in every new RecursiveTask it creates. With recursion, you don’t know how many levels of tasks you’ll need in the beginning. Again, you’ll need to check for a true in the boolean periodically in your code and when true, end the task gracefully.

The above is just a hint of how you can do a cancel. Every application is different. What I do works for my application – but the logic is the same. You need something common in every task that a task can set and every other task can see.

I'd add more code but the code insert only is taking one line at a time and it isn't practical.

like image 43
edharned Avatar answered Oct 03 '22 19:10

edharned