Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor : Tasks are getting queued up and not submitted

We have a scenario where tasks submitted to ThreadPoolExecutor are long running. When the thread pool is started we start it with core pool size = 5, max pool size = 20 and queue size of 10. In our application around 10 tasks get submitted. Most of the time these tasks run for few mins/hrs and then complete. However there was a situation when all of the 5 tasks got hanged on I/O. As a result my core pool size reached it max, but my Threadpoolexecutor queue was not full. So the additional 5 tasks never got a chance to run. Please do suggest how we can handle such scenario? Is having a smaller queue better option in such situation? What would be an optimum queue size while initializing threadPool?

Also regarding the hanged tasks, is there any way we can pull out the threads out of the Threadpool? In that case at least other tasks would get a chance to run.

like image 876
user1269597 Avatar asked Feb 22 '13 20:02

user1269597


3 Answers

The overall situation is like this:

core pool size = 5,
max pool size = 20 and 
queue size of 10

10 tasks are submitted. Out of which

  1. 5 Tasks hanged on I/O => all threads of core pool size are occupied. And hence there is no idle thread.
  2. 5 Tasks are remained . These 5 threads are enqueued to queue since there is no idle thread and the queue can accommodate 10 tasks. These enqueued tasks will not execute until either the queue is full or any of the threads in core pool is free.

Hence, Your Program is hanged .

To know more about dynamics of ThreadPoolExecutor watch here . The notable points of this doc is as follows:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

EDIT
If you wish to increase core pool size then you can use setCorePoolSize(int corePoolSize) . If you increase the corepoolsize then new threads will, if needed, be started to execute any queued tasks.

like image 86
Vishal K Avatar answered Oct 09 '22 19:10

Vishal K


The Javadocs for ThreadPoolExecutor states:

Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:

  • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Unless you exceed your queue size after 5 threads are "hanging", you're not going to get more threads.

The real answer is: fix the problem that's causing your threads to hang. Otherwise you're going to have to implement some scheme that uses the Futures returned by submit() to cancel threads if they are running too long.

like image 39
Brian Roach Avatar answered Oct 09 '22 21:10

Brian Roach


I think another approach would be to set the corePoolSize based on the the number of tasks waiting in the queue. One can control the corePoolSize by using setCorePoolSize. A sample monitor thread can control you threadPoolExecutor. You can also improve this monitor to adjust the degree of parallelism.

    public class ExecutorMonitor extends Thread{

            ThreadPoolExecutor executor = null;
            int initialCorePoolSize;
            public ExecutorMonitor(ThreadPoolExecutor executor)
            {
                this.executor = executor;
                this.initialCorePoolSize = executor.getCorePoolSize();
            }
            @Override
            public void run()
            {
                while (true)
                {   
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (executor.getQueue().size() > 0)
                    {
                        if(executor.getActiveCount() < executor.getMaximumPoolSize())
                            executor.setCorePoolSize(executor.getCorePoolSize() + 1);
                    }
                    if (executor.getQueue().size() == 0)
                    {
                        if(executor.getCorePoolSize() > initialCorePoolSize)
                            executor.setCorePoolSize(executor.getCorePoolSize() -1);
                    }
                }
            }
        }
like image 22
celik Avatar answered Oct 09 '22 19:10

celik