Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ExecutorService: awaitTermination of all recursively created tasks

I use an ExecutorService to execute a task. This task can recursively create other tasks which are submitted to the same ExecutorService and those child tasks can do that, too.

I now have the problem that I want to wait until all the tasks are done (that is, all tasks are finished and they did not submit new ones) before I continue.

I cannot call ExecutorService.shutdown() in the main thread because this prevents new tasks from being accepted by the ExecutorService.

And Calling ExecutorService.awaitTermination() seems to do nothing if shutdown hasn't been called.

So I am kinda stuck here. It can't be that hard for the ExecutorService to see that all workers are idle, can it? The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor and query its getPoolSize() every once in a while. Is there really no better way do do that?

like image 723
Christoph Avatar asked Feb 10 '11 14:02

Christoph


People also ask

How do I stop all threads in ExecutorService?

ExecutorService interface provides 3 methods shutdown(), shutdownNow() and awaitTermination​() for controlling the termination of tasks submitted to executors.

How does ExecutorService work in Java?

The executor service creates and maintains a reusable pool of threads for executing submitted tasks. The service also manages a queue, which is used when there are more tasks than the number of threads in the pool and there is a need to queue up tasks until there is a free thread available to execute the task.

What are the advantages of using ExecutorService instead of creating threads directly?

ExecutorService abstracts away many of the complexities associated with the lower-level abstractions like raw Thread . It provides mechanisms for safely starting, closing down, submitting, executing, and blocking on the successful or abrupt termination of tasks (expressed as Runnable or Callable ).

What is the difference between executor and ExecutorService?

Executor just executes stuff you give it. ExecutorService adds startup, shutdown, and the ability to wait for and look at the status of jobs you've submitted for execution on top of Executor (which it extends). This is a perfect answer, short and clear.


2 Answers

This really is an ideal candidate for a Phaser. Java 7 is coming out with this new class. Its a flexible CountdonwLatch/CyclicBarrier. You can get a stable version at JSR 166 Interest Site.

The way it is a more flexible CountdownLatch/CyclicBarrier is because it is able to not only support an unknown number of parties (threads) but its also reusable (thats where the phase part comes in)

For each task you submit you would register, when that task is completed you arrive. This can be done recursively.

Phaser phaser = new Phaser(); ExecutorService e = //  Runnable recursiveRunnable = new Runnable(){    public void run(){       //do work recursively if you have to        if(shouldBeRecursive){            phaser.register();            e.submit(recursiveRunnable);       }        phaser.arrive();    } }  public void doWork(){    int phase = phaser.getPhase();     phaser.register();    e.submit(recursiveRunnable);     phaser.awaitAdvance(phase); } 

Edit: Thanks @depthofreality for pointing out the race condition in my previous example. I am updating it so that executing thread only awaits advance of the current phase as it blocks for the recursive function to complete.

The phase number won't trip until the number of arrives == registers. Since prior to each recursive call invokes register a phase increment will happen when all invocations are complete.

like image 135
John Vint Avatar answered Oct 02 '22 19:10

John Vint


If number of tasks in the tree of recursive tasks is initially unknown, perhaps the easiest way would be to implement your own synchronization primitive, some kind of "inverse semaphore", and share it among your tasks. Before submitting each task you increment a value, when task is completed, it decrements that value, and you wait until the value is 0.

Implementing it as a separate primitive explicitly called from tasks decouples this logic from the thread pool implementation and allows you to submit several independent trees of recursive tasks into the same pool.

Something like this:

public class InverseSemaphore {     private int value = 0;     private Object lock = new Object();      public void beforeSubmit() {         synchronized(lock) {             value++;         }     }      public void taskCompleted() {         synchronized(lock) {             value--;             if (value == 0) lock.notifyAll();         }     }      public void awaitCompletion() throws InterruptedException {         synchronized(lock) {             while (value > 0) lock.wait();         }     } } 

Note that taskCompleted() should be called inside a finally block, to make it immune to possible exceptions.

Also note that beforeSubmit() should be called by the submitting thread before the task is submitted, not by the task itself, to avoid possible "false completion" when old tasks are completed and new ones not started yet.

EDIT: Important problem with usage pattern fixed.

like image 24
axtavt Avatar answered Oct 02 '22 20:10

axtavt