I am trying to change Quartz Sequential execution to Parallel Execution.
It is working fine, Performance wise, it is seems good but Spawned (created) threads are not destroyed.
It is Still in Runnable
State; why and How can I fix that?
Please Guide me.
Code is here :
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
logger.error("Result Processing executed");
List<Object[]> lstOfExams = examService.getExamEntriesForProcessingResults();
String timeZone = messageService.getMessage("org.default_timezone", null, Locale.getDefault());
if(lstOfExams!=null&&!lstOfExams.isEmpty()){
ThreadPoolTaskExecutor threadPoolExecuter = new ThreadPoolTaskExecutor();
threadPoolExecuter.setCorePoolSize(lstOfExams.size());
threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);
threadPoolExecuter.setBeanName("ThreadPoolTaskExecutor");
threadPoolExecuter.setQueueCapacity(100);
threadPoolExecuter.setThreadNamePrefix("ThreadForUpdateExamResult");
threadPoolExecuter.initialize();
for(Object[] obj : lstOfExams){
if(StringUtils.isNotBlank((String)obj[2]) ){
timeZone = obj[2].toString();
}
try {
Userexams userexams=examService.findUserExamById(Long.valueOf(obj[0].toString()));
if(userexams.getExamresult()==null){
UpdateUserExamDataThread task=new UpdateUserExamDataThread(obj,timeZone);
threadPoolExecuter.submit(task);
}
// testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString());
// logger.error("Percentage Marks:::::"+result.getPercentageCatScore());
} catch (Exception e) {
Log.error("Exception at ResultProcessingJob extends QuartzJobBean executeInternal(JobExecutionContext context) throws JobExecutionException",e);
continue;
}
}
threadPoolExecuter.shutdown();
}
}
UpdateUserExamDataThread .class
@Component
//@Scope(value="prototype", proxyMode=ScopedProxyMode.TARGET_CLASS)
//public class UpdateUserExamDataThread extends ThreadLocal<String> //implements Runnable {
public class UpdateUserExamDataThread implements Runnable {
private Logger log = Logger.getLogger(UpdateUserExamDataThread.class);
@Autowired
ExamService examService;
@Autowired
TestEvaluator testEvaluator;
private Object[] obj;
private String timeZone;
public UpdateUserExamDataThread(Object[] obj,String timeZone) {
super();
this.obj = obj;
this.timeZone = timeZone;
}
@Override
public void run() {
String threadName=String.valueOf(obj[0]);
log.info("UpdateUserExamDataThread Start For:::::"+threadName);
testEvaluator.generateTestResultAsPerEvaluator(Long.valueOf(obj[0].toString()), obj[4].toString(), obj[3]==null?null:obj[3].toString(),timeZone ,obj[5].toString() ,obj[1].toString());
//update examResult
log.info("UpdateUserExamDataThread End For:::::"+threadName);
}
}
TestEvaluatorImpl.java
@Override
@Transactional
public Examresult generateTestResultAsPerEvaluator(Long userExamId, String evaluatorType, String codingLanguage,String timeZoneFollowed ,String inctenceId ,String userId) {
dbSchema = messageService.getMessage("database.default_schema", null, Locale.getDefault());
try {
//Some Methods
return examResult;
}catch(Exception e){
log.erorr(e);
}
}
I can provide Thread Dump file if needed.
In the main thread we wait for the latch to countdown: latch. await() . This will block main thread execution. After which you could safely shutdown the thread pool knowing all the work has been completed.
ThreadPoolTaskExecutor is a java bean that allows for configuring a ThreadPoolExecutor in a bean style by setting up the values for the instance variables like corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity and exposing it as a Spring TaskExecutor.
Returns the thread keep-alive time, which is the amount of time which threads in excess of the core pool size may remain idle before being terminated.
As an example, if the thread throws an exception and pool class does not catch this exception, then the thread will simply exit, reducing the size of the thread pool by one. If this repeats many times, then the pool would eventually become empty and no threads would be available to execute other requests.
it seems you create a thread pool in the same size of exams which is not quite optimal.
// Core pool size is = number of exams
threadPoolExecuter.setCorePoolSize(lstOfExams.size());
// Max pool size is just 1 + exam size.
threadPoolExecuter.setMaxPoolSize(lstOfExams.size()+1);
You have to consider that: - If you create a thread pool and started it as many threads as defined in core size started immediately.
In your case I would set the core pool size 5 or 10 (it actually depends on the how many core your target CPU have and/or how IO bound the submitted tasks are).
The max pool size can be double of that but it doesn't effective until the queue is full.
To let the size of live threads decrease after the submitted work done you have to set 2 parameters.
setKeepAliveSeconds(int keepAliveSeconds) : Which let the threads shut down automatically if they are not used along the defined seconds (by default 60 seconds, which is optimal) BUT this is normally only used to shut down threads of non-core pool threads.
To shut down threads of core part after keepAliveSeconds you have to set setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) as true. Which is normally false to keep core pool alive as long as the application is running.
I hope it helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With