Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling Exceptions for ThreadPoolExecutor

Tags:

I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.

The JobExecutor in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.

JobExecutor returns a Future<Boolean> for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?

public class DataMovingClass {     private static final AtomicInteger uniqueId = new AtomicInteger(0);    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();       ThreadPoolExecutor threadPoolExecutor  = null ;     private List<Source> sources = new ArrayList<Source>();      private static class IDGenerator extends ThreadLocal<Integer> {         @Override         public Integer get() {             return uniqueId.incrementAndGet();         }   }    public void init(){      // load sources list    }    public boolean execute() {      boolean succcess = true ;      threadPoolExecutor = new ThreadPoolExecutor(10,10,                 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),                 new ThreadFactory() {                     public Thread newThread(Runnable r) {                         Thread t = new Thread(r);                         t.setName("DataMigration-" + uniqueNumber.get());                         return t;                     }// End method                 }, new ThreadPoolExecutor.CallerRunsPolicy());       List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();       for (Source source : sources) {                     result.add(threadPoolExecutor.submit(new JobExecutor(source)));      }       for (Future<Boolean> jobDone : result) {                 try {                     if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {                         // in case of successful DbWriterClass, we don't need to change                         // it.                         success = false;                     }                 } catch (Exception ex) {                     // handle exceptions                 }             }    }    public class JobExecutor implements Callable<Boolean>  {          private ThreadPoolExecutor threadPoolExecutor ;         Source jobSource ;         public SourceJobExecutor(Source source) {             this.jobSource = source;             threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),                     new ThreadFactory() {                         public Thread newThread(Runnable r) {                             Thread t = new Thread(r);                             t.setName("Job Executor-" + uniqueNumber.get());                             return t;                         }// End method                     }, new ThreadPoolExecutor.CallerRunsPolicy());         }          public Boolean call() throws Exception {             boolean status = true ;              System.out.println("Starting Job = " + jobSource.getName());             try {                          // do the specified task ;                }catch (InterruptedException intrEx) {                 logger.warn("InterruptedException", intrEx);                 status = false ;             } catch(Exception e) {                 logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);                 status = false ;             }            System.out.println("Ending Job = " + jobSource.getName());             return status ;         }     } }    
like image 691
jagamot Avatar asked Mar 31 '10 16:03

jagamot


People also ask

Is ThreadPoolExecutor faster?

ThreadPoolExecutor Can Be Slower for CPU-Bound Tasks Using the ThreadPoolExecutor for a CPU-bound task can be slower than not using it. This is because Python threads are constrained by the Global Interpreter Lock, or GIL.

What if thread in a thread pool throws an exception?

As an example, if the thread throws an exception and pool class does not catch this exception, then the thread will simply exit, reducing the size of the thread pool by one. If this repeats many times, then the pool would eventually become empty and no threads would be available to execute other requests.

Is ThreadPoolExecutor thread safe Python?

ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.


1 Answers

When you submit a task to the executor, it returns you a FutureTask instance.

FutureTask.get() will re-throw any exception thrown by the task as an ExecutorException.

So when you iterate through the List<Future> and call get on each, catch ExecutorException and invoke an orderly shutdown.

like image 172
matt b Avatar answered Oct 25 '22 03:10

matt b