I use an ExecutorService
to execute a task. This task can recursively create other tasks which are submitted to the same ExecutorService
and those child tasks can do that, too.
I now have the problem that I want to wait until all the tasks are done (that is, all tasks are finished and they did not submit new ones) before I continue.
I cannot call ExecutorService.shutdown()
in the main thread because this prevents new tasks from being accepted by the ExecutorService
.
And Calling ExecutorService.awaitTermination()
seems to do nothing if shutdown
hasn't been called.
So I am kinda stuck here. It can't be that hard for the ExecutorService
to see that all workers are idle, can it? The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor
and query its getPoolSize()
every once in a while. Is there really no better way do do that?
ExecutorService interface provides 3 methods shutdown(), shutdownNow() and awaitTermination() for controlling the termination of tasks submitted to executors.
The executor service creates and maintains a reusable pool of threads for executing submitted tasks. The service also manages a queue, which is used when there are more tasks than the number of threads in the pool and there is a need to queue up tasks until there is a free thread available to execute the task.
ExecutorService abstracts away many of the complexities associated with the lower-level abstractions like raw Thread . It provides mechanisms for safely starting, closing down, submitting, executing, and blocking on the successful or abrupt termination of tasks (expressed as Runnable or Callable ).
Executor just executes stuff you give it. ExecutorService adds startup, shutdown, and the ability to wait for and look at the status of jobs you've submitted for execution on top of Executor (which it extends). This is a perfect answer, short and clear.
This really is an ideal candidate for a Phaser. Java 7 is coming out with this new class. Its a flexible CountdonwLatch/CyclicBarrier. You can get a stable version at JSR 166 Interest Site.
The way it is a more flexible CountdownLatch/CyclicBarrier is because it is able to not only support an unknown number of parties (threads) but its also reusable (thats where the phase part comes in)
For each task you submit you would register, when that task is completed you arrive. This can be done recursively.
Phaser phaser = new Phaser(); ExecutorService e = // Runnable recursiveRunnable = new Runnable(){ public void run(){ //do work recursively if you have to if(shouldBeRecursive){ phaser.register(); e.submit(recursiveRunnable); } phaser.arrive(); } } public void doWork(){ int phase = phaser.getPhase(); phaser.register(); e.submit(recursiveRunnable); phaser.awaitAdvance(phase); }
Edit: Thanks @depthofreality for pointing out the race condition in my previous example. I am updating it so that executing thread only awaits advance of the current phase as it blocks for the recursive function to complete.
The phase number won't trip until the number of arrive
s == register
s. Since prior to each recursive call invokes register
a phase increment will happen when all invocations are complete.
If number of tasks in the tree of recursive tasks is initially unknown, perhaps the easiest way would be to implement your own synchronization primitive, some kind of "inverse semaphore", and share it among your tasks. Before submitting each task you increment a value, when task is completed, it decrements that value, and you wait until the value is 0.
Implementing it as a separate primitive explicitly called from tasks decouples this logic from the thread pool implementation and allows you to submit several independent trees of recursive tasks into the same pool.
Something like this:
public class InverseSemaphore { private int value = 0; private Object lock = new Object(); public void beforeSubmit() { synchronized(lock) { value++; } } public void taskCompleted() { synchronized(lock) { value--; if (value == 0) lock.notifyAll(); } } public void awaitCompletion() throws InterruptedException { synchronized(lock) { while (value > 0) lock.wait(); } } }
Note that taskCompleted()
should be called inside a finally
block, to make it immune to possible exceptions.
Also note that beforeSubmit()
should be called by the submitting thread before the task is submitted, not by the task itself, to avoid possible "false completion" when old tasks are completed and new ones not started yet.
EDIT: Important problem with usage pattern fixed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With