Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService, how to wait for all tasks to finish

People also ask

How do I wait till ExecutorService to finish?

When using an Executor, we can shut it down by calling the shutdown() or shutdownNow() methods. Although, it won't wait until all threads stop executing. Waiting for existing threads to complete their execution can be achieved by using the awaitTermination() method.

Which method waits till all threads initiated by the service () method have finished?

parallelStream(). forEach(req -> makeRequest(req)); It's super simple and readable. Behind the scenes it is using default JVM's fork join pool which means that it will wait for all the threads to finish before continuing.

How do I stop all threads in ExecutorService?

Using shutdownNow() The shutdownNow() is a hard signal to destroy ExecutorService immediately along with stopping the execution of all in-progress and queued tasks. Use this method, when we want the application to stop processing all tasks immediately.


The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

There are a few related questions on SO:

  • How to wait for all threads to finish

  • Return values from java threads

  • invokeAll() not willing to accept a Collection<Callable<t>>

  • Do I need to synchronize?

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.


If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.


If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService — specifically, an ExecutorCompletionService.

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.


If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.


You could wait jobs to finish on a certain interval:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.


Just use

latch = new CountDownLatch(noThreads)

In each thread

latch.countDown();

and as barrier

latch.await();

Root cause for IllegalMonitorStateException:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

From your code, you have just called wait() on ExecutorService without owning the lock.

Below code will fix IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Follow one of below approaches to wait for completion of all tasks, which have been submitted to ExecutorService.

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

  2. Using invokeAll on ExecutorService

  3. Using CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

  5. Shutdown the pool as recommended in oracle documentation page

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.