I am running into an issue where if I attempt to resize a ThreadPoolExecutor
's core pool size to a different number after the pool has been created, then intermittently, some tasks are rejected with a RejectedExecutionException
even though I never submit more than queueSize + maxPoolSize
number of tasks.
The problem that I am trying to solve is to extend ThreadPoolExecutor
that resizes its core threads based on the pending executions sitting in the thread pool's queue. I need this because by default a ThreadPoolExecutor
will create a new Thread
only if the queue is full.
Here is a small self-contained Pure Java 8 program that demonstrates the problem.
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolResizeTest {
public static void main(String[] args) throws Exception {
// increase the number of iterations if unable to reproduce
// for me 100 iterations have been enough
int numberOfExecutions = 100;
for (int i = 1; i <= numberOfExecutions; i++) {
executeOnce();
}
}
private static void executeOnce() throws Exception {
int minThreads = 1;
int maxThreads = 5;
int queueCapacity = 10;
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolExecutor.AbortPolicy()
);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
0, 10, TimeUnit.MILLISECONDS);
CompletableFuture<Void> taskBlocker = new CompletableFuture<>();
try {
int totalTasksToSubmit = queueCapacity + maxThreads;
for (int i = 1; i <= totalTasksToSubmit; i++) {
// following line sometimes throws a RejectedExecutionException
pool.submit(() -> {
// block the thread and prevent it from completing the task
taskBlocker.join();
});
// Thread.sleep(10); //enabling even a small sleep makes the problem go away
}
} finally {
taskBlocker.complete(null);
scheduler.shutdown();
pool.shutdown();
}
}
/**
* Resize the thread pool if the number of pending tasks are non-zero.
*/
private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
int pendingExecutions = pool.getQueue().size();
int approximateRunningExecutions = pool.getActiveCount();
/*
* New core thread count should be the sum of pending and currently executing tasks
* with an upper bound of maxThreads and a lower bound of minThreads.
*/
int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));
pool.setCorePoolSize(newThreadCount);
pool.prestartAllCoreThreads();
}
}
Why should the pool ever throw a RejectedExecutionException if I never submit more that the queueCapacity+maxThreads. I am never changing the max threads so by ThreadPoolExecutor's definition, it should either accommodate the task in a Thread or to the queue.
Of course, if I never resize the pool, then the thread pool never rejects any submissions. This is also hard to debug since adding any sort of delays in the submissions makes the problem go away.
Any pointers on how to fix the RejectedExecutionException?
Once 'max' number of threads are reached, no more will be created, and new tasks will be queued until a thread is available to run them.
Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100. As requests come in, threads will be created up to 5 and then tasks will be added to the queue until it reaches 100. When the queue is full new threads will be created up to maxPoolSize .
Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.
Let's see the Java ThreadPoolExecutor methods are given below. The afterExecute () method is invoked after execution of the given Runnable. This method is invoked by the same thread that executed the task. The beforeExecute () method invoked before executing the given Runnable in the given thread.
The rules for the size of a ThreadPoolExecutor 's pool are generally miss-understood, because it doesn't work the way that you think it ought to or in the way that you want it to. Take this example. Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100.
We can use ThreadPoolExecutor to create thread pool in Java. Java thread pool manages the collection of Runnable threads. The worker threads execute Runnable threads from the queue. java.util.concurrent.Executors provide factory and support methods for java.util.concurrent.Executor interface to create the thread pool in java.
Let's test the default configuration of ThreadPoolTaskExecutor, which defines a corePoolSize of one thread, an unbounded maxPoolSize, and an unbounded queueCapacity. As a result, we expect that no matter how many tasks we start, we'll only have one thread running:
Here is a scenario why this is happening:
In my example I use minThreads = 0, maxThreads = 2 and queueCapacity = 2 to make it shorter. The first command gets submitted, this is done in the method execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
for this command workQueue.offer(command) than addWorker(null, false) is executed. The worker thread first takes this command out of the queue in the thread run method, so at this time the queue still has one command,
The second command gets submitted this time workQueue.offer(command) is executed. Now the queue is full
Now the ScheduledExecutorService executes the resizeThreadPool method which calls setCorePoolSize with maxThreads. Here is the method setCorePoolSize:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
This method adds one worker using addWorker(null, true). No there are 2 worker queues running, the maximum and the queue is full.
The third command gets submitted and fails because workQueue.offer(command) and addWorker(command, false) fails, leading to the Exception:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
I think to solve this problem you should set the capacityof the queue to the maximum of commands you want to execute.
Not sure if this qualifies as bug. This is the behavior when the additional workers threads are created after the queue is full but this has been kind of noted in java docs that caller has to deal with tasks being rejected.
Java docs
Factory for new threads. All threads are created using this factory (via method addWorker). All callers must be prepared for addWorker to fail, which may reflect a system or user's policy limiting the number of threads. Even though it is not treated as an error, failure to create threads may result in new tasks being rejected or existing ones remaining stuck in the queue.
When you resize the core pool size, lets say increase, the additional workers are created (addWorker
method in setCorePoolSize
) and the call to create additional work (addWorker
method from execute
) is rejected when the addWorker
returns false (add Worker
last code snippet) as the enough additional workers are already created by setCorePoolSize
but not run yet to reflect the update in the queue.
Relevant parts
Compare
public void setCorePoolSize(int corePoolSize) {
....
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
....
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
}
Use custom retry rejection execution handler ( This should work for your case as you have upper bound as max pool size ). Please adjust as needed.
public static class RetryRejectionPolicy implements RejectedExecutionHandler {
public RetryRejectionPolicy () {}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
while(true)
if(e.getQueue().offer(r)) break;
}
}
}
ThreadPoolExecutor pool = new ThreadPoolExecutor(
minThreads, maxThreads,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(queueCapacity),
new ThreadPoolResizeTest.RetryRejectionPolicy()
);
Also note your use of shutdown is not correct as this will not wait for submitted task to complete execution but use with awaitTermination
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With