Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.

Is there anything I can do to somehow make the program wait if the thread queue is full of work?

like image 484
Lostsoul Avatar asked Apr 27 '12 15:04

Lostsoul


People also ask

How do you prevent ThreadPoolExecutor?

The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.

What happens when thread pool is full in Java?

Once 'max' number of threads are reached, no more will be created, and new tasks will be queued until a thread is available to run them.

How many maximum threads can be created using a ThreadPool?

ThreadPool will create maximum of 10 threads to process 10 requests at a time. After process completion of any single Thread, ThreadPool will internally allocate the 11th request to this Thread and will keep on doing the same to all the remaining requests.


2 Answers

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Instead of an open-ended queue, you can use a BlockingQueue with a limit on it:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

In terms of jobs submitted to an ExecutorService, instead of using the default ExecutorServices created using Executors, which use an unbounded queue, you can create your own:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler that submits to the queue. Something like:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
      // check afterwards and throw if pool shutdown
      if (executor.isShutdown()) {
         throw new RejectedExecutionException(
              "Task " + r + " rejected from " + e);
      }
   }
});

I think it's a major miss that Java doesn't have a ThreadPoolExecutor.CallerBlocksPolicy.

like image 137
Gray Avatar answered Nov 01 '22 09:11

Gray


If you want the acknowledgment when the worker starts working on the task, you can make a custom ThreadFactory that sends the acknowledgment from the thread before doing the actual work. OR you can override beforeExecute of a ThreadPoolExecutor.

If you want the acknowledgment when a new worker is freed up for a new task, I think you can initialize a ThreadPoolExecutor with a SynchronousQueue and a ThreadPoolExecutor.CallerRunsPolicy, or with your own policy where the caller blocks.

like image 37
trutheality Avatar answered Nov 01 '22 11:11

trutheality