Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TaskRejectedException in ThreadPoolTaskExecutor

I am trying to call an API asynchronously using Spring's Async and using the ThreadPoolTaskExecutor in my Thread Config which goes:

@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {

@Value("${core.pool.size}")
private int corePoolSize;

@Value("${max.pool.size}")
private int maxPoolSize;

@Value("${queue.capacity}")
private int queueCapacity;

@Override
@Bean
public Executor getAsyncExecutor() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setThreadNamePrefix("default_task_executor_thread");
    executor.initialize();

    return executor;

}

The settings here are:

corePoolSize = 5;
maxPoolSize = 10;
QueueCapacity = 10;

I'm calling the Async service as follows:

for (String path : testList) {
    Future<Boolean> pro = services.invokeAPI(path);
}

The testList has about 50 records.

When I run this, the compiler processes 10 threads and calls the invokeAPI method 10 times after which it gives:

org.springframework.core.task.TaskRejectedException: Executor[java.util.concurrent.ThreadPoolExecutor@3234ad78[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$1@5c17b70

I was assuming that it will wait for the current tasks to complete and re-assign the threads instead of throwing me the exception and terminating the program.

What should I do to have all my 50 records call the invokeAPI method?

Edit: the number of records in testList can change.

Any suggestions please?

like image 955
Akshay Chopra Avatar asked Mar 15 '18 01:03

Akshay Chopra


3 Answers

Hy @Akshay Chopra,

According to @Shaize's response:

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        }
        catch (InterruptedException e) {
            throw new RejectedExecutionException("There was an unexpected InterruptedException whilst waiting to add a Runnable in the executor's blocking queue", e);
        }
    }
}

in order to have multi threading without TaskRejectedException, you have to implement a TaskRejectedHnadler, see below:

@Configuration
@EnableAsync
public class ThreadConfig extends AsyncConfigurerSupport {

    @Value("${core.pool.size}")
    private int corePoolSize;

    @Value("${max.pool.size}")
    private int maxPoolSize;

    @Value("${queue.capacity}")
    private int queueCapacity;

    @Override
    @Bean
    public Executor getAsyncExecutor() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setThreadNamePrefix("default_task_executor_thread");

        // add a rejected execution handler
        executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());

        executor.initialize();

        return executor;
    }
}

Another way is to use reactive FLUX with synchronously method instead Async:

Flux.just(dtos.toArray()) // in my case an ArrayList
    .parallel(corePoolSize) // 8 in my case
    .runOn(Schedulers.parallel())
    .subscribe(dto -> computeService.compute((CastYourObject) dto));

and you're done.

Best.

like image 125
Raul Urcan Avatar answered Sep 27 '22 21:09

Raul Urcan


One way to approach this is by implementing a RejectedExecutionHandler policy with something like the below

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class BlockCallerExecutionPolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // based on the BlockingQueue documentation below should block until able to place on the queue... 
            executor.getQueue().put(r);
        }
        catch (InterruptedException e) {
            throw new RejectedExecutionException("Unexpected InterruptedException while waiting to add Runnable to ThreadPoolExecutor queue...", e);
        }
    }
}

What this does is cause the calling thread (likely the main thread) to wait until there's room on the blocking queue.

like image 35
Shaize Avatar answered Sep 27 '22 21:09

Shaize


This is happening because of the size you are using for the pool. Since the size of the queue is 10 and the max threads you can have is 10, therefore after 20 tasks (10 running and 10 in queue) the executor starts rejecting the tasks.

There are various ways to solve this problem.

  1. Use unbounded queue. i.e. Don't specify the size of the queue and hence it will be able to hold all the tasks. Once the threads are free, the tasks will be submitted.
  2. Provide a RejectedExecutionHandler which does something with the tasks. i.e. Runs them on the caller thread or discard them or something else (Depending on the use case). There are some of them already provided by Java like CallerRunsPolicy, AbortPolicy, DiscardPolicy and DiscardOldestPolicy. You can specify them like using executor#setRejectedExecutionHandler.
  3. Provide your own Blocking Thread Pool Executor which blocks till the there is more room for tasks (Uses Semaphore).

Here is an example of Blocking Executor

public class BlockingExecutor extends ThreadPoolExecutor {

    private final Semaphore semaphore;

    public BlockingExecutor(final int corePoolSize, final int poolSize, final int queueSize) {
        super(corePoolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        semaphore = new Semaphore(poolSize + queueSize);
    }

    @Override
    public void execute(final Runnable task) {
        boolean acquired = false;
        do {
            try {
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e) {
                //do something here
            }
        } while (!acquired);

        try {
            super.execute(task);
        } catch (final RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

    protected void afterExecute(final Runnable r, final Throwable t) {
        super.afterExecute(r, t);
        semaphore.release();
    }
}
like image 38
Sneh Avatar answered Sep 27 '22 20:09

Sneh