In a Spring web application I have several DAO and service layer beans. One service layer bean has annotated @Async / @Scheduled methods. These methods depend on other (autowired) beans. I have configured two thread pools in XML:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="5" /> <property name="queueCapacity" value="5" /> <property name="waitForTasksToCompleteOnShutdown" value="true" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <bean id="taskScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"> <property name="poolSize" value="10" /> <property name="waitForTasksToCompleteOnShutdown" value="true" /> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> <task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
Everything works as expected. My problem is that I cannot get a clean shutdown of the task pools to work. The tasks operate on the database and on the file system. When I stop the web application it takes some time until it is stopped. This indicates that the waitForTasksToCompleteOnShutdown
property works. However, I get IllegalStateExceptions in the log indicating that some beans are already destroyed but some worker task threads are still executing and they fail because their dependencies are destroyed.
There is a JIRA issue which might be relevant: SPR-5387
My question is: Is there a way to tell Spring to initialize the task executor/scheduler beans last or is there a way to tell Spring to destroy them first?
My understanding is that destruction takes place in reversed init order. Therefore the bean init'ed last will be destroyed first. If the thread pool beans are destroyed first, all currently executing tasks would finish and could still access dependent beans.
I have also tried using the depends-on attribute on the thread pools referring to my service bean which has the @Async and @Scheduled annotations. Seems like they are never executed then and I do not get context initialization errors. I assume the annotated service bean somehow needs these thread pools initialized first and if I use depends-on I reverse the order and make them non-functional.
In the main thread we wait for the latch to countdown: latch. await() . This will block main thread execution. After which you could safely shutdown the thread pool knowing all the work has been completed.
The TaskExecutor was originally created to give other Spring components an abstraction for thread pooling where needed. Components such as the ApplicationEventMulticaster , JMS's AbstractMessageListenerContainer , and Quartz integration all use the TaskExecutor abstraction to pool threads.
ThreadPoolTaskScheduler ThreadPoolTaskScheduler is well suited for internal thread management, as it delegates tasks to the ScheduledExecutorService and implements the TaskExecutor interface – so that single instance of it is able to handle asynchronous potential executions as well as the @Scheduled annotation.
Two ways:
Have a bean implement ApplicationListener<ContextClosedEvent>
. onApplicationEvent()
will get called before the context and all the beans are destroyed.
Have a bean implement Lifecycle or SmartLifecycle. stop()
will get called before the context and all the beans are destroyed.
Either way you can shut down the task stuff before the bean destroying mechanism takes place.
Eg:
@Component public class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> { @Autowired ThreadPoolTaskExecutor executor; @Autowired ThreadPoolTaskScheduler scheduler; @Override public void onApplicationEvent(ContextClosedEvent event) { scheduler.shutdown(); executor.shutdown(); } }
(Edit: Fixed method signature)
I have added below code to terminate tasks you can use it. You may change the retry numbers.
package com.xxx.test.schedulers; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import com.xxx.core.XProvLogger; @Component class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{ private ApplicationContext context; public Logger logger = XProvLogger.getInstance().x; public void onApplicationEvent(ContextClosedEvent event) { Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class); for (ThreadPoolTaskScheduler scheduler : schedulers.values()) { scheduler.getScheduledExecutor().shutdown(); try { scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS); if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown()) logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped"); else{ logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately"); scheduler.getScheduledExecutor().shutdownNow(); logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately"); } } catch (IllegalStateException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class); for (ThreadPoolTaskExecutor executor: executers.values()) { int retryCount = 0; while(executor.getActiveCount()>0 && ++retryCount<51){ try { logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } if(!(retryCount<51)) logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately"); executor.shutdown(); logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed"); } } @Override public void setApplicationContext(ApplicationContext context) throws BeansException { this.context = context; } @Override public Object postProcessAfterInitialization(Object object, String arg1) throws BeansException { return object; } @Override public Object postProcessBeforeInitialization(Object object, String arg1) throws BeansException { if(object instanceof ThreadPoolTaskScheduler) ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true); if(object instanceof ThreadPoolTaskExecutor) ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true); return object; }
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With