Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a dynamic (growing/shrinking) thread pool

I need to implement a thread pool in Java (java.util.concurrent) whose number of threads is at some minimum value when idle, grows up to an upper bound (but never further) when jobs are submitted into it faster than they finish executing, and shrinks back to the lower bound when all jobs are done and no more jobs are submitted.

How would you implement something like that? I imagine that this would be a fairly common usage scenario, but apparently the java.util.concurrent.Executors factory methods can only create fixed-size pools and pools that grow unboundedly when many jobs are submitted. The ThreadPoolExecutor class provides corePoolSize and maximumPoolSize parameters, but its documentation seems to imply that the only way to ever have more than corePoolSize threads at the same time is to use a bounded job queue, in which case, if you've reached maximumPoolSize threads, you'll get job rejections which you have to deal with yourself? I came up with this:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

Am I overlooking something? Is there a better way to do this? Also, depending on one's requirements, it might be problematic that the above code will not finish until at least (I think) (total number of jobs) - maxSize jobs have finished. So if you want to be able to submit an arbitrary number of jobs into the pool and proceed immediately without waiting for any of them to finish, I don't see how you could do that without having a dedicated "job sumitting" thread that manages the required unbounded queue to hold all the submitted jobs. AFAICS, if you're using an unbounded queue for the ThreadPoolExecutor itself, its thread count will never grow beyond corePoolSize.

like image 550
Olaf Klischat Avatar asked Jun 28 '12 16:06

Olaf Klischat


People also ask

How do you make a thread pool?

To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.

Should you use a thread pool or just create a new thread whenever you need it?

I would advise you to use a ThreadPool instead of creating a new Thread. You can have a beginning of answer in Oracle's documentation about thread pools.

What is ExecutorService how it is used to create pool of threads?

The ExecutorService interface contains a large number of methods to control the progress of the tasks and manage the termination of the service. Using this interface, we can submit the tasks for execution and also control their execution using the returned Future instance.


3 Answers

When growing and shrinking comes together with thread, there is only one name which comes to my mind: CachedThreadPool from java.util.concurrent package.

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool() can reuse the thread, as well as create new threads when needed. And yes, if a thread is idle for 60 seconds, CachedThreadPool will kill it. So this is quite lightweight – growing and shrinking in your words!

like image 126
Kumar Vivek Mitra Avatar answered Nov 05 '22 07:11

Kumar Vivek Mitra


One trick that might help you is to assign a RejectedExecutionHandler that uses the same thread to submit the job into the blocking queue. That will block the current thread and remove the need for some sort of loop.

See my answer here:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Here's the rejection handler copied from that answer.

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

You should then be able to make use of the core/max thread count as long as you realize that the bounded blocking queue that you use first fills up before any threads are created above the core threads. So if you have 10 core threads and you want the 11th job to start the 11th thread you will need to have a blocking queue with a size of 0 unfortunately (maybe a SynchronousQueue). I feel that this is a real limitation in the otherwise great ExecutorService classes.

like image 34
Gray Avatar answered Nov 05 '22 06:11

Gray


Set maximumPoolSize to Integer.MAX_VALUE. If you ever have more than 2 billion threads...well, good luck with that.

Anyway, the Javadoc of ThreadPoolExecutor states:

By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

With a similarly unbounded task queue like a LinkedBlockingQueue, this should have arbitrarily large capacity.

like image 43
Louis Wasserman Avatar answered Nov 05 '22 06:11

Louis Wasserman