I've been frustrated for some time with the default behavior of ThreadPoolExecutor
which backs the ExecutorService
thread-pools that so many of us use. To quote from the Javadocs:
If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
What this means is that if you define a thread pool with the following code, it will never start the 2nd thread because the LinkedBlockingQueue
is unbounded.
ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Only if you have a bounded queue and the queue is full are any threads above the core number started. I suspect a large number of junior Java multithreaded programmers are unaware of this behavior of the ThreadPoolExecutor
.
Now I have specific use case where this is not-optimal. I'm looking for ways, without writing my own TPE class, to work around it.
My requirements are for a web service that is making call-backs to a possibly unreliable 3rd party.
newFixedThreadPool(...)
with a large number of threads that mostly are dormant.newCachedThreadPool()
.How can I work around this limitation in ThreadPoolExecutor
where the queue needs to be bounded and full before more threads will be started? How can I get it to start more threads before queuing tasks?
Edit:
@Flavio makes a good point about using the ThreadPoolExecutor.allowCoreThreadTimeOut(true)
to have the core threads timeout and exit. I considered that but I still wanted the core-threads feature. I did not want the number of threads in the pool to drop below the core-size if possible.
The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to Executors.
Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100. As requests come in, threads will be created up to 5 and then tasks will be added to the queue until it reaches 100. When the queue is full new threads will be created up to maxPoolSize .
ThreadPool will create maximum of 10 threads to process 10 requests at a time. After process completion of any single Thread, ThreadPool will internally allocate the 11th request to this Thread and will keep on doing the same to all the remaining requests.
How can I work around this limitation in
ThreadPoolExecutor
where the queue needs to be bounded and full before more threads will be started.
I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with ThreadPoolExecutor
. It involves extending LinkedBlockingQueue
to have it return false
for queue.offer(...)
when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the RejectedExecutionHandler
will be called which does the put(...)
into the queue.
It certainly is strange to write a queue where offer(...)
can return false
and put()
never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.
Here's the code:
// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { // Offer it to the queue if there is 0 items already queued, else // return false so the TPE will add another thread. If we return false // and max threads have been reached then the RejectedExecutionHandler // will be called which will do the put into the queue. if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // This does the actual put into the queue. Once the max threads // have been reached, the tasks will then queue up. executor.getQueue().put(r); // we do this after the put() to stop race conditions if (executor.isShutdown()) { throw new RejectedExecutionException( "Task " + r + " rejected from " + e); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } });
With this mechanism, when I submit tasks to the queue, the ThreadPoolExecutor
will:
offer(...)
will return false.RejectedExecutionHandler
RejectedExecutionHandler
then puts the task into the queue to be processed by the first available thread in FIFO order.Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the LinkedBlockingQueue
then it will:
Also, if you needed to use offer(...)
in the RejectedExecutionHandler
then you could use the offer(E, long, TimeUnit)
method instead with Long.MAX_VALUE
as the timeout.
Warning:
If you expect tasks to be added to the executor after it has been shutdown, then you may want to be smarter about throwing RejectedExecutionException
out of our custom RejectedExecutionHandler
when the executor-service has been shutdown. Thanks to @RaduToader for pointing this out.
Edit:
Another tweak to this answer could be to ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add ourQueue.setThreadPoolExecutor(tpe);
method on it.
Then your offer(...)
method might look something like:
tpe.getPoolSize() == tpe.getMaximumPoolSize()
in which case just call super.offer(...)
.tpe.getPoolSize() > tpe.getActiveCount()
then call super.offer(...)
since there seem to be idle threads.false
to fork another thread.Maybe this:
int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; }
Note that the get methods on TPE are expensive since they access volatile
fields or (in the case of getActiveCount()
) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.
I've already got two other answers on this question, but I suspect this one is the best.
It's based on the technique of the currently accepted answer, namely:
offer()
method to (sometimes) return false,ThreadPoolExecutor
to either spawn a new thread or reject the task, andRejectedExecutionHandler
to actually queue the task on rejection.The problem is when offer()
should return false. The currently accepted answer returns false when the queue has a couple of tasks on it, but as I've pointed out in my comment there, this causes undesirable effects. Alternately, if you always return false, you'll keep spawning new threads even when you have threads waiting on the queue.
The solution is to use Java 7 LinkedTransferQueue
and have offer()
call tryTransfer()
. When there is a waiting consumer thread the task will just get passed to that thread. Otherwise, offer()
will return false and the ThreadPoolExecutor
will spawn a new thread.
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With