Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Limit the number of threads and wait if max threads reached using @Async annotation

Tags:

I am using Spring's Java Configuration with AsyncConfigurer:

@Configuration
@EnableAsync
public class AppConfig implements AsyncConfigurer {

@Override
public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(10);
        executor.setThreadNamePrefix("MyExecutor-");
        executor.initialize();
        return executor;
    }
}

Now suppose I have a method with @Async annotation and suppose its called 2 times already and 2 threads are still running. As per my understanding, any new call to it will be added in the queue with capacity 10. Now if I receive 11th Task, what will be the behaviour of it? will it reject the task as stated here: https://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html? or will the caller wait for a queue slot to get empty?

My Requirement is to not exeed a fix number of threads spawned using @Async method and to make the caller wait if max number of threads are reached. Will this be achieved if I use ConcurrentTaskExecutor with a fixed thread pool of a particular size?

like image 768
KCK Avatar asked May 07 '19 08:05

KCK


2 Answers

I wanted to limit the number of possible threads along with not loosing any message. This requirement of mine was not fulfilled from the existing answers and I found another way to do it. Hence, posting it as an answer:


I made a Executor Bean as follows:

@Bean(name = "CustomAsyncExecutor")
public Executor customThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(5);
    executor.setQueueCapacity(0);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("Async_Thread_");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.initialize();
    return executor;
}

And then using

@Async("CustomAsyncExecutor")
public void methodName(){
....
}

Given that when the threads are busy & when queue is full, new tasks get rejected,

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())

helped me that when my 5 threads are busy, my invoker thread will execute the task & since my invoker thread is within the async function, it won't take any new tasks. Thus, I won't loose my tasks without increasing the queue size.

like image 126
KCK Avatar answered Nov 15 '22 05:11

KCK


According to how ThreadPoolExecutor works, the 11th task will be rejected because when the queue is full, the executor try to increase the pool size and that if it cannot do it because the max value was reached, it rejects the task.
You can find the information there in the Spring documentation :

The main idea is that, when a task is submitted, the executor first tries to use a free thread if the number of active threads is currently less than the core size. If the core size has been reached, the task is added to the queue, as long as its capacity has not yet been reached. Only then, if the queue’s capacity has been reached, does the executor create a new thread beyond the core size. If the max size has also been reached, then the executor rejects the task.

About your requirement :

My Requirement is to not exeed a fix number of threads spawned using @Async method and to make the caller wait if max number of threads are reached. Will this be achieved if I use ConcurrentTaskExecutor with a fixed thread pool of a particular size?

So increase the queue size consequently and leave the same values for core and max pool size. You could also use an unbounded queue that is the default value for the queue size parameter but beware because it could lead to OutOfMemoryError if too many tasks are stacked in the queue.

like image 45
davidxxx Avatar answered Nov 15 '22 06:11

davidxxx