I need to implement a thread pool in Java (java.util.concurrent) whose number of threads is at some minimum value when idle, grows up to an upper bound (but never further) when jobs are submitted into it faster than they finish executing, and shrinks back to the lower bound when all jobs are done and no more jobs are submitted.
How would you implement something like that? I imagine that this would be a fairly common usage scenario, but apparently the java.util.concurrent.Executors
factory methods can only create fixed-size pools and pools that grow unboundedly when many jobs are submitted. The ThreadPoolExecutor
class provides corePoolSize
and maximumPoolSize
parameters, but its documentation seems to imply that the only way to ever have more than corePoolSize
threads at the same time is to use a bounded job queue, in which case, if you've reached maximumPoolSize
threads, you'll get job rejections which you have to deal with yourself? I came up with this:
//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(minSize));
...
//submitting jobs
for (Runnable job : ...) {
while (true) {
try {
pool.submit(job);
System.out.println("Job " + job + ": submitted");
break;
} catch (RejectedExecutionException e) {
// maxSize jobs executing concurrently atm.; re-submit new job after short wait
System.out.println("Job " + job + ": rejected...");
try {
Thread.sleep(300);
} catch (InterruptedException e1) {
}
}
}
}
Am I overlooking something? Is there a better way to do this? Also, depending on one's requirements, it might be problematic that the above code will not finish until at least (I think) (total number of jobs) - maxSize
jobs have finished. So if you want to be able to submit an arbitrary number of jobs into the pool and proceed immediately without waiting for any of them to finish, I don't see how you could do that without having a dedicated "job sumitting" thread that manages the required unbounded queue to hold all the submitted jobs. AFAICS, if you're using an unbounded queue for the ThreadPoolExecutor itself, its thread count will never grow beyond corePoolSize.
To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.
I would advise you to use a ThreadPool instead of creating a new Thread. You can have a beginning of answer in Oracle's documentation about thread pools.
The ExecutorService interface contains a large number of methods to control the progress of the tasks and manage the termination of the service. Using this interface, we can submit the tasks for execution and also control their execution using the returned Future instance.
When growing and shrinking comes together with thread, there is only one name which comes to my mind: CachedThreadPool from java.util.concurrent package.
ExecutorService executor = Executors.newCachedThreadPool();
CachedThreadPool() can reuse the thread, as well as create new threads when needed. And yes, if a thread is idle for 60 seconds, CachedThreadPool will kill it. So this is quite lightweight – growing and shrinking in your words!
One trick that might help you is to assign a RejectedExecutionHandler
that uses the same thread to submit the job into the blocking queue. That will block the current thread and remove the need for some sort of loop.
See my answer here:
How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
Here's the rejection handler copied from that answer.
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
}
});
You should then be able to make use of the core/max thread count as long as you realize that the bounded blocking queue that you use first fills up before any threads are created above the core threads. So if you have 10 core threads and you want the 11th job to start the 11th thread you will need to have a blocking queue with a size of 0 unfortunately (maybe a SynchronousQueue
). I feel that this is a real limitation in the otherwise great ExecutorService
classes.
Set maximumPoolSize
to Integer.MAX_VALUE
. If you ever have more than 2 billion threads...well, good luck with that.
Anyway, the Javadoc of ThreadPoolExecutor
states:
By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).
With a similarly unbounded task queue like a LinkedBlockingQueue
, this should have arbitrarily large capacity.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With