Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor policy

I'm trying to use a ThreadPoolExecutor to schedule tasks, but running into some problems with its policies. Here's its stated behavior:

  1. If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
  2. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
  3. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

The behavior I want is this:

  1. same as above
  2. If more than corePoolSize but less than maximumPoolSize threads are running, prefers adding a new thread over queuing, and using an idle thread over adding a new thread.
  3. same as above

Basically I don't want any tasks to be rejected; I want them to be queued in an unbounded queue. But I do want to have up to maximumPoolSize threads. If I use an unbounded queue, it never generates threads after it hits coreSize. If I use a bounded queue, it rejects tasks. Is there any way around this?

What I'm thinking about now is running the ThreadPoolExecutor on a SynchronousQueue, but not feeding tasks directly to it - instead feeding them to a separate unbounded LinkedBlockingQueue. Then another thread feeds from the LinkedBlockingQueue into the Executor, and if one gets rejected, it simply tries again until it is not rejected. This seems like a pain and a bit of a hack, though - is there a cleaner way to do this?

like image 752
Joe K Avatar asked Aug 05 '10 21:08

Joe K


People also ask

What is a ThreadPoolExecutor and why is it necessary?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

How do you prevent ThreadPoolExecutor?

You can call the cancel() function on the Future object to cancel the task before it has started running. If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.

What is the use of ThreadPoolExecutor in java?

Executes the given task sometime in the future. Invokes shutdown when this executor is no longer referenced and it has no threads. Returns the approximate number of threads that are actively executing tasks. Returns the approximate total number of tasks that have completed execution.

Can you give an example for ExecutorService?

Here is a code example: ExecutorService executorService = Executors. newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables. add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.


2 Answers

It probably isn't necessary to micro-manage the thread pool as being requested.

A cached thread pool will re-use idle threads while also allowing potentially unlimited concurrent threads. This of course could lead to runaway performance degrading from context switching overhead during bursty periods.

Executors.newCachedThreadPool();

A better option is to place a limit on the total number of threads while discarding the notion of ensuring idle threads are used first. The configuration changes would be:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Reasoning over this scenario, if the executor has less than corePoolSize threads, than it must not be very busy. If the system is not very busy, then there is little harm in spinning up a new thread. Doing this will cause your ThreadPoolExecutor to always create a new worker if it is under the maximum number of workers allowed. Only when the maximum number of workers are "running" will workers waiting idly for tasks be given tasks. If a worker waits aReasonableTimeDuration without a task, then it is allowed to terminate. Using reasonable limits for the pool size (after all, there are only so many CPUs) and a reasonably large timeout (to keep threads from needlessly terminating), the desired benefits will likely be seen.

The final option is hackish. Basically, the ThreadPoolExecutor internally uses BlockingQueue.offer to determine if the queue has capacity. A custom implementation of BlockingQueue could always reject the offer attempt. When the ThreadPoolExecutor fails to offer a task to the queue, it will try to make a new worker. If a new worker can not be created, a RejectedExecutionHandler would be called. At that point, a custom RejectedExecutionHandler could force a put into the custom BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}
like image 159
Tim Bender Avatar answered Oct 19 '22 22:10

Tim Bender


Your use case is common, completely legit and unfortunately more difficult than one would expect. For background info you can read this discussion and find a pointer to a solution (also mentioned in the thread) here. Shay's solution works fine.

Generally I'd be a bit wary of unbounded queues; it's usually better to have explicit incoming flow control that degrades gracefully and regulates the ratio of current/remaining work to not overwhelm either producer or consumer.

like image 44
Holger Hoffstätte Avatar answered Oct 19 '22 22:10

Holger Hoffstätte