Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop all the running threads, if one of those throws an Exception?

In one of my application I'm using the ExecutorService class to create a fixed thread pool and CountDownLatch to wait for the threads to complete. This is working fine if the process didn't throw any exception . If there is an exception occurred in any of the threads, I need to stop all the running thread and report the error to the main thread. Can any one please help me to solve this?

This is the sample code I'm using for executing multiple threads.

    private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
        try      
       {
        CountDownLatch latch = new CountDownLatch(noOfThreads);
        for(int i=0; i< noOfThreads; i++){
         executor.submit(new ThreadExecutor(latch));
        }
        latch.await();           
       }
       catch(Exception e)
       {
        e.printStackTrace();
       }
       finally
       {
        executor.shutDown();
       }
   }

This is the Executor Class

     public class ThreadExecutor implements Callable<String> {
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {
        doMyTask(); // process logic goes here!
        this.latch.countDown();
        return "Success";
    }

=============================================================================

Thank you all :)

I have corrected my class as given below and that is working now.

private void executeThreads()
    {
        int noOfThreads = 10;
        ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);     
       ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
    try
    {
        userContext = BSF.getMyContext();
        CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);

        for(ImportContent artifact:artifactList){
            futureList.add(executor.submit(new ThreadExecutor(latch)));
        }

        latch.await();

        for(Future<Object> future : futureList)
        {
                  try
                  {
                      future.get();                 
                   }
                   catch(ExecutionException e)
                   {   //handle it               
                    }
        }           

    }
    catch (Exception e) {
       //handle it
    }
    finally
    {
        executor.shutdown();      

        try
        {
            executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
           //handle it
        }
    }
   }

Executor Class :

public class ThreadExecutor implements Callable<String> {
        private static volatile boolean isAnyError;
        CountDownLatch latch ;
        public ThreadExecutor(CountDownLatch latch){
            this.latch = latch;
        }   

    @Override
    public String call() throws Exception
    {

      try{
            if(!isAnyError)
            { 
               doMyTask(); // process logic goes here!
            }
     }
     catch(Exception e)
     {
        isAnyError = true ;
        throw e;
      }
      finally
      {
        this.latch.countDown();
       }
        return "Success";
    }
like image 403
Achu S Avatar asked Aug 16 '12 09:08

Achu S


2 Answers

Use an ExecutorCompletionService, complete with an ExecutorService that outlives the duration of the tasks (i.e. it doesn't get shut down afterwards):

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class Threader {

    static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        new Threader().start();
        service.shutdown();
    }

    private void start() {
        CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
                service);
        /*
         * Holds all the futures for the submitted tasks
         */
        List<Future<Void>> results = new ArrayList<Future<Void>>();

        for (int i = 0; i < 3; i++) {
            final int callableNumber = i;

            results.add(completionService.submit(new Callable<Void>() {

                                                     @Override
                                                     public Void call() throws Exception {
                                                         System.out.println("Task " + callableNumber
                                                                 + " in progress");
                                                         try {
                                                             Thread.sleep(callableNumber * 1000);
                                                         } catch (InterruptedException ex) {
                                                             System.out.println("Task " + callableNumber
                                                                     + " cancelled");
                                                             return null;
                                                         }
                                                         if (callableNumber == 1) {
                                                             throw new Exception("Wrong answer for task "
                                                                     + callableNumber);
                                                         }
                                                         System.out.println("Task " + callableNumber + " complete");
                                                         return null;
                                                     }
                                                 }

            ));
        }

        boolean complete = false;
        while (!complete) {
            complete = true;
            Iterator<Future<Void>> futuresIt = results.iterator();
            while (futuresIt.hasNext()) {
                if (futuresIt.next().isDone()) {
                    futuresIt.remove();
                } else {
                    complete = false;
                }
            }

            if (!results.isEmpty()) {
                try {
                /*
                 * Examine results of next completed task
                 */
                    completionService.take().get();
                } catch (InterruptedException e) {
                /*
                 * Give up - interrupted.
                 */
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (ExecutionException e) {
                /*
                 * The task threw an exception
                 */
                    System.out.println("Execution exception " + e.getMessage());
                    complete = true;
                    for (Future<Void> future : results) {
                        if (!future.isDone()) {
                            System.out.println("Cancelling " + future);
                            future.cancel(true);
                        }
                    }
                }
            }
        }

    }
}

Output is something like:

Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled

where Task 2 got cancelled due to the failure of Task 1.

like image 86
artbristol Avatar answered Oct 21 '22 19:10

artbristol


I strongly suggest you use a robust mechanism to count down the latch. Use an all-encompassing try-finally { latch.countDown(); } Detect errors in threads using a separate mechanism.

like image 27
Marko Topolnik Avatar answered Oct 21 '22 18:10

Marko Topolnik