Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java: ExecutorService that blocks on submission after a certain queue size [duplicate]

I am trying to code a solution in which a single thread produces I/O-intensive tasks that can be performed in parallel. Each task have significant in-memory data. So I want to be able limit the number of tasks that are pending at a moment.

If I create ThreadPoolExecutor like this:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,                                   0L, TimeUnit.MILLISECONDS,                                   new LinkedBlockingQueue<Runnable>(maxQueue)); 

Then the executor.submit(callable) throws RejectedExecutionException when the queue fills up and all the threads are already busy.

What can I do to make executor.submit(callable) block when the queue is full and all threads are busy?

EDIT: I tried this:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 

And it somewhat achieves the effect that I want achieved but in an inelegant way (basically rejected threads are run in the calling thread, so this blocks the calling thread from submitting more).

EDIT: (5 years after asking the question)

To anyone reading this question and its answers, please don't take the accepted answer as one correct solution. Please read through all answers and comments.

like image 583
Tahir Akhtar Avatar asked Dec 23 '10 19:12

Tahir Akhtar


People also ask

Does ExecutorService submit block?

Right, this ExecutorService blocks tasks on submission without blocking caller thread. Job just getting submitted and will be processed asynchronously when there will be enough system resources for it.

Is ExecutorService blocked?

It is not blocking, it is just that in Java as long is there are non-daemon threads the application is not closing: class Test{ public static void main(String[] args){ ExecutorService exec = new Executors.

What are the advantages of using ExecutorService instead of creating threads directly?

ExecutorService abstracts away many of the complexities associated with the lower-level abstractions like raw Thread . It provides mechanisms for safely starting, closing down, submitting, executing, and blocking on the successful or abrupt termination of tasks (expressed as Runnable or Callable ).

How do you handle exceptions in ExecutorService in Java?

If you want to process exceptions thrown by the task, then it is generally better to use Callable rather than Runnable . If Callable. call() throws an exception, this will be wrapped in an ExecutionException and thrown by Future. get() .


2 Answers

I have done this same thing. The trick is to create a BlockingQueue where the offer() method is really a put(). (you can use whatever base BlockingQueue impl you want).

public class LimitedQueue<E> extends LinkedBlockingQueue<E>  {     public LimitedQueue(int maxSize)     {         super(maxSize);     }      @Override     public boolean offer(E e)     {         // turn offer() and add() into a blocking calls (unless interrupted)         try {             put(e);             return true;         } catch(InterruptedException ie) {             Thread.currentThread().interrupt();         }         return false;     }  } 

Note that this only works for thread pool where corePoolSize==maxPoolSize so be careful there (see comments).

like image 197
jtahlborn Avatar answered Oct 08 '22 11:10

jtahlborn


The currently accepted answer has a potentially significant problem - it changes the behavior of ThreadPoolExecutor.execute such that if you have a corePoolSize < maxPoolSize, the ThreadPoolExecutor logic will never add additional workers beyond the core.

From ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     else if (!addWorker(command, false))         reject(command); 

Specifically, that last 'else' block willl never be hit.

A better alternative is to do something similar to what OP is already doing - use a RejectedExecutionHandler to do the same put logic:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {     try {         if (!executor.isShutdown()) {             executor.getQueue().put(r);         }     } catch (InterruptedException e) {         Thread.currentThread().interrupt();         throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);     } } 

There are some things to watch out for with this approach, as pointed out in the comments (referring to this answer):

  1. If corePoolSize==0, then there is a race condition where all threads in the pool may die before the task is visible
  2. Using an implementation that wraps the queue tasks (not applicable to ThreadPoolExecutor) will result in issues unless the handler also wraps it the same way.

Keeping those gotchas in mind, this solution will work for most typical ThreadPoolExecutors, and will properly handle the case where corePoolSize < maxPoolSize.

like image 20
Krease Avatar answered Oct 08 '22 13:10

Krease