In one of my application I'm using the ExecutorService
class to create a fixed thread pool and CountDownLatch
to wait for the threads to complete. This is working fine if the process didn't throw any exception . If there is an exception occurred in any of the threads, I need to stop all the running thread and report the error to the main thread. Can any one please help me to solve this?
This is the sample code I'm using for executing multiple threads.
private void executeThreads()
{
int noOfThreads = 10;
ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);
try
{
CountDownLatch latch = new CountDownLatch(noOfThreads);
for(int i=0; i< noOfThreads; i++){
executor.submit(new ThreadExecutor(latch));
}
latch.await();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
executor.shutDown();
}
}
This is the Executor Class
public class ThreadExecutor implements Callable<String> {
CountDownLatch latch ;
public ThreadExecutor(CountDownLatch latch){
this.latch = latch;
}
@Override
public String call() throws Exception
{
doMyTask(); // process logic goes here!
this.latch.countDown();
return "Success";
}
=============================================================================
Thank you all :)
I have corrected my class as given below and that is working now.
private void executeThreads()
{
int noOfThreads = 10;
ExecutorService executor = Executors.newFixedThreadPool(noOfThreads);
ArrayList<Future<Object>> futureList = new ArrayList<Future<Object>>(noOfThreads );
try
{
userContext = BSF.getMyContext();
CountDownLatch latch = new CountDownLatch(noOfComponentsToImport);
for(ImportContent artifact:artifactList){
futureList.add(executor.submit(new ThreadExecutor(latch)));
}
latch.await();
for(Future<Object> future : futureList)
{
try
{
future.get();
}
catch(ExecutionException e)
{ //handle it
}
}
}
catch (Exception e) {
//handle it
}
finally
{
executor.shutdown();
try
{
executor.awaitTermination(90000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
//handle it
}
}
}
Executor Class :
public class ThreadExecutor implements Callable<String> {
private static volatile boolean isAnyError;
CountDownLatch latch ;
public ThreadExecutor(CountDownLatch latch){
this.latch = latch;
}
@Override
public String call() throws Exception
{
try{
if(!isAnyError)
{
doMyTask(); // process logic goes here!
}
}
catch(Exception e)
{
isAnyError = true ;
throw e;
}
finally
{
this.latch.countDown();
}
return "Success";
}
Use an ExecutorCompletionService
, complete with an ExecutorService
that outlives the duration of the tasks (i.e. it doesn't get shut down afterwards):
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class Threader {
static ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) {
new Threader().start();
service.shutdown();
}
private void start() {
CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
service);
/*
* Holds all the futures for the submitted tasks
*/
List<Future<Void>> results = new ArrayList<Future<Void>>();
for (int i = 0; i < 3; i++) {
final int callableNumber = i;
results.add(completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
System.out.println("Task " + callableNumber
+ " in progress");
try {
Thread.sleep(callableNumber * 1000);
} catch (InterruptedException ex) {
System.out.println("Task " + callableNumber
+ " cancelled");
return null;
}
if (callableNumber == 1) {
throw new Exception("Wrong answer for task "
+ callableNumber);
}
System.out.println("Task " + callableNumber + " complete");
return null;
}
}
));
}
boolean complete = false;
while (!complete) {
complete = true;
Iterator<Future<Void>> futuresIt = results.iterator();
while (futuresIt.hasNext()) {
if (futuresIt.next().isDone()) {
futuresIt.remove();
} else {
complete = false;
}
}
if (!results.isEmpty()) {
try {
/*
* Examine results of next completed task
*/
completionService.take().get();
} catch (InterruptedException e) {
/*
* Give up - interrupted.
*/
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
/*
* The task threw an exception
*/
System.out.println("Execution exception " + e.getMessage());
complete = true;
for (Future<Void> future : results) {
if (!future.isDone()) {
System.out.println("Cancelling " + future);
future.cancel(true);
}
}
}
}
}
}
}
Output is something like:
Task 0 in progress
Task 2 in progress
Task 1 in progress
Task 0 complete
Execution exception java.lang.Exception: Wrong answer for task 1
Cancelling java.util.concurrent.FutureTask@a59698
Task 2 cancelled
where Task 2 got cancelled due to the failure of Task 1.
I strongly suggest you use a robust mechanism to count down the latch. Use an all-encompassing try-finally { latch.countDown(); }
Detect errors in threads using a separate mechanism.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With