I am trying to call an API asynchronously using Spring's Async and using the ThreadPoolTaskExecutor in my Thread Config which goes:
@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {
@Value("${core.pool.size}")
private int corePoolSize;
@Value("${max.pool.size}")
private int maxPoolSize;
@Value("${queue.capacity}")
private int queueCapacity;
@Override
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.initialize();
return executor;
}
The settings here are:
corePoolSize = 5;
maxPoolSize = 10;
QueueCapacity = 10;
I'm calling the Async service as follows:
for (String path : testList) {
Future<Boolean> pro = services.invokeAPI(path);
}
The testList has about 50 records.
When I run this, the compiler processes 10 threads and calls the invokeAPI method 10 times after which it gives:
org.springframework.core.task.TaskRejectedException: Executor[java.util.concurrent.ThreadPoolExecutor@3234ad78[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$1@5c17b70
I was assuming that it will wait for the current tasks to complete and re-assign the threads instead of throwing me the exception and terminating the program.
What should I do to have all my 50 records call the invokeAPI method?
Edit: the number of records in testList can change.
Any suggestions please?
Hy @Akshay Chopra,
According to @Shaize's response:
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("There was an unexpected InterruptedException whilst waiting to add a Runnable in the executor's blocking queue", e);
}
}
}
in order to have multi threading without TaskRejectedException, you have to implement a TaskRejectedHnadler, see below:
@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {
@Value("${core.pool.size}")
private int corePoolSize;
@Value("${max.pool.size}")
private int maxPoolSize;
@Value("${queue.capacity}")
private int queueCapacity;
@Override
@Bean
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("default_task_executor_thread");
// add a rejected execution handler
executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
executor.initialize();
return executor;
}
}
Another way is to use reactive FLUX with synchronously method instead Async:
Flux.just(dtos.toArray()) // in my case an ArrayList
.parallel(corePoolSize) // 8 in my case
.runOn(Schedulers.parallel())
.subscribe(dto -> computeService.compute((CastYourObject) dto));
and you're done.
Best.
One way to approach this is by implementing a RejectedExecutionHandler policy with something like the below
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class BlockCallerExecutionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// based on the BlockingQueue documentation below should block until able to place on the queue...
executor.getQueue().put(r);
}
catch (InterruptedException e) {
throw new RejectedExecutionException("Unexpected InterruptedException while waiting to add Runnable to ThreadPoolExecutor queue...", e);
}
}
}
What this does is cause the calling thread (likely the main thread) to wait until there's room on the blocking queue.
This is happening because of the size you are using for the pool. Since the size of the queue is 10 and the max threads you can have is 10, therefore after 20 tasks (10 running and 10 in queue) the executor starts rejecting the tasks.
There are various ways to solve this problem.
RejectedExecutionHandler
which does something with the tasks. i.e. Runs them on the caller thread or discard them or something else (Depending on the use case). There are some of them already provided by Java like CallerRunsPolicy
, AbortPolicy
, DiscardPolicy
and DiscardOldestPolicy
. You can specify them like using executor#setRejectedExecutionHandler
.Here is an example of Blocking Executor
public class BlockingExecutor extends ThreadPoolExecutor {
private final Semaphore semaphore;
public BlockingExecutor(final int corePoolSize, final int poolSize, final int queueSize) {
super(corePoolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
semaphore = new Semaphore(poolSize + queueSize);
}
@Override
public void execute(final Runnable task) {
boolean acquired = false;
do {
try {
semaphore.acquire();
acquired = true;
} catch (final InterruptedException e) {
//do something here
}
} while (!acquired);
try {
super.execute(task);
} catch (final RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
protected void afterExecute(final Runnable r, final Throwable t) {
super.afterExecute(r, t);
semaphore.release();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With