Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ThreadPoolExecutor: Updating core pool size dynamically rejects incoming tasks intermittently

I am running into an issue where if I attempt to resize a ThreadPoolExecutor's core pool size to a different number after the pool has been created, then intermittently, some tasks are rejected with a RejectedExecutionException even though I never submit more than queueSize + maxPoolSize number of tasks.

The problem that I am trying to solve is to extend ThreadPoolExecutor that resizes its core threads based on the pending executions sitting in the thread pool's queue. I need this because by default a ThreadPoolExecutor will create a new Thread only if the queue is full.

Here is a small self-contained Pure Java 8 program that demonstrates the problem.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

Why should the pool ever throw a RejectedExecutionException if I never submit more that the queueCapacity+maxThreads. I am never changing the max threads so by ThreadPoolExecutor's definition, it should either accommodate the task in a Thread or to the queue.

Of course, if I never resize the pool, then the thread pool never rejects any submissions. This is also hard to debug since adding any sort of delays in the submissions makes the problem go away.

Any pointers on how to fix the RejectedExecutionException?

like image 608
Swaranga Sarma Avatar asked Mar 11 '20 05:03

Swaranga Sarma


People also ask

What happens when thread pool is full in Java?

Once 'max' number of threads are reached, no more will be created, and new tasks will be queued until a thread is available to run them.

What is Corepoolsize and max pool size in ThreadPoolExecutor?

Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100. As requests come in, threads will be created up to 5 and then tasks will be added to the queue until it reaches 100. When the queue is full new threads will be created up to maxPoolSize .

How do you prevent ThreadPoolExecutor?

Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.

What are the methods of threadpoolexecutor in Java?

Let's see the Java ThreadPoolExecutor methods are given below. The afterExecute () method is invoked after execution of the given Runnable. This method is invoked by the same thread that executed the task. The beforeExecute () method invoked before executing the given Runnable in the given thread.

What is the maximum size of a threadpoolexecutor's pool?

The rules for the size of a ThreadPoolExecutor 's pool are generally miss-understood, because it doesn't work the way that you think it ought to or in the way that you want it to. Take this example. Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100.

How to create a thread pool in Java?

We can use ThreadPoolExecutor to create thread pool in Java. Java thread pool manages the collection of Runnable threads. The worker threads execute Runnable threads from the queue. java.util.concurrent.Executors provide factory and support methods for java.util.concurrent.Executor interface to create the thread pool in java.

What is the default configuration of threadpooltaskexecutor?

Let's test the default configuration of ThreadPoolTaskExecutor, which defines a corePoolSize of one thread, an unbounded maxPoolSize, and an unbounded queueCapacity. As a result, we expect that no matter how many tasks we start, we'll only have one thread running:


2 Answers

Here is a scenario why this is happening:

In my example I use minThreads = 0, maxThreads = 2 and queueCapacity = 2 to make it shorter. The first command gets submitted, this is done in the method execute:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

for this command workQueue.offer(command) than addWorker(null, false) is executed. The worker thread first takes this command out of the queue in the thread run method, so at this time the queue still has one command,

The second command gets submitted this time workQueue.offer(command) is executed. Now the queue is full

Now the ScheduledExecutorService executes the resizeThreadPool method which calls setCorePoolSize with maxThreads. Here is the method setCorePoolSize:

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

This method adds one worker using addWorker(null, true). No there are 2 worker queues running, the maximum and the queue is full.

The third command gets submitted and fails because workQueue.offer(command) and addWorker(command, false) fails, leading to the Exception:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

I think to solve this problem you should set the capacityof the queue to the maximum of commands you want to execute.

like image 101
Thomas Krieger Avatar answered Oct 23 '22 02:10

Thomas Krieger


Not sure if this qualifies as bug. This is the behavior when the additional workers threads are created after the queue is full but this has been kind of noted in java docs that caller has to deal with tasks being rejected.

Java docs

Factory for new threads. All threads are created using this factory (via method addWorker). All callers must be prepared for addWorker to fail, which may reflect a system or user's policy limiting the number of threads. Even though it is not treated as an error, failure to create threads may result in new tasks being rejected or existing ones remaining stuck in the queue.

When you resize the core pool size, lets say increase, the additional workers are created (addWorker method in setCorePoolSize) and the call to create additional work (addWorker method from execute) is rejected when the addWorker returns false (add Worker last code snippet) as the enough additional workers are already created by setCorePoolSize but not run yet to reflect the update in the queue.

Relevant parts

Compare

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;             
}

Use custom retry rejection execution handler ( This should work for your case as you have upper bound as max pool size ). Please adjust as needed.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

Also note your use of shutdown is not correct as this will not wait for submitted task to complete execution but use with awaitTermination instead.

like image 2
s7vr Avatar answered Oct 23 '22 01:10

s7vr