I'm fairly new to Spring-Batch so this may be a lack of understanding on my part. I'm wanting to understand how to dynamically increase and decrease threads using the ThreadPoolTaskExecutor in conjunction with the ThreadPoolExecutor while my job is running. I've tried to subclass both the ThreadPoolTaskExecutor and the ThreadPoolExecutor so I can gain access to the beforeExecute() and afterExecute() which would allow me to terminate threads if the corepoolsize was decreased using an approach that is listed on this site.
What I seem to be not understanding is that when I override the initializeExecutor() method which returns an ExecutorService, it apparently does not set the (private internal) threadPoolExecutor variable in the parent class (ThreadPoolTaskExecutor). It sets the private ExecutorService executor; (from the ExecutorConfigurationSupport class)
Since the threadPoolExecutor is not a protected member I cannot gain access to it. Without that being set, when I run I obviously end up getting a "ThreadPoolExecutor not initialized" error within the Spring Framework when I examine what's wrong under the covers.
public class MyThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
@Override
protected ExecutorService initializeExecutor(ThreadFactory tf, RejectedExecutionHandler reh)
{
BlockingQueue <Runnable> queue = createQueue(Integer.MAX_VALUE);
MyThreadPoolExecutor tp_executor = new MyThreadPoolExecutor( this.getCorePoolSize(), this.getMaxPoolSize(), this.getKeepAliveSeconds(), TimeUnit.SECONDS, queue);
// if you look at the parent class(ThreadPoolTaskExecutor) it performs this call next.
// this.threadPoolExecutor = executor;
// that is a private member with no ability to set via any methods.
return tp_executor;
}
}
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTimeout, TimeUnit timeunit, BlockingQueue<Runnable> workQueue, ThreadFactory tf, RejectedExecutionHandler reh)
{
super(corePoolSize, maxPoolSize, keepAliveTimeout, timeunit, workQueue, tf, reh);
}
protected void beforeExecute (final Thread thread, final Runnable job)
{
...
}
}
Can someone explain what I am missing in my approach?
I assume you want to use one number of threads in one job step and another number of threads in another job step. Simple way to achieve that would be to declare two separate executors with necessary number of threads, zero corePoolSize (to not create threads when this is not necessary) and zero keepAliveSeconds (to not keep threads when this is no longer necessary). Then just inject first executor in one step and second executor in another step.
@Configuration
public class Conf {
@Bean
public TaskExecutor executorA(@Value("${first.number.of.threads}") int numberOfThreads) {
return executor(numberOfThreads);
}
@Bean
public TaskExecutor executorB(@Value("${second.number.of.threads}") int numberOfThreads) {
return executor(numberOfThreads);
}
private TaskExecutor executor(int numberOfThreads) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(0);
executor.setMaxPoolSize(numberOfThreads);
executor.setAllowCoreThreadTimeOut(true);
executor.setKeepAliveSeconds(0);
return executor;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With