Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService, standard way to avoid to task queue getting too full

I am using ExecutorService for ease of concurrent multithreaded program. Take following code:

while(xxx) {     ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);     ...       Future<..> ... = exService.submit(..);     ... } 

In my case the problem is that submit() is not blocking if all NUMBER_THREADS are occupied. The consequence is that the Task queue is getting flooded by many tasks. The consequence of this is, that shutting down the execution service with ExecutorService.shutdown() takes ages (ExecutorService.isTerminated() will be false for long time). Reason is that the task queue is still quite full.

For now my workaround is to work with semaphores to disallow to have to many entries inside the task queue of ExecutorService:

... Semaphore semaphore=new Semaphore(NUMBER_THREADS);  while(xxx) {     ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);      ...     semaphore.aquire();       // internally the task calls a finish callback, which invokes semaphore.release()     // -> now another task is added to queue     Future<..> ... = exService.submit(..);      ... } 

I am sure there is a better more encapsulated solution?

like image 536
manuel aldana Avatar asked Feb 11 '10 21:02

manuel aldana


People also ask

Is ExecutorService submit blocking?

Right, this ExecutorService blocks tasks on submission without blocking caller thread. Job just getting submitted and will be processed asynchronously when there will be enough system resources for it.

What happens if blocking queue is full?

Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

Which method can cancel the future task triggered by submit () of ExecutorService?

You can cancel the task submitted to ExecutorService by simply calling the cancel method on the future submitted when the task is submitted.


2 Answers

The trick is to use a fixed queue size and:

new ThreadPoolExecutor.CallerRunsPolicy() 

I also recommend using Guava's ListeningExecutorService. Here is an example consumer/producer queues.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));  private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {     return new ThreadPoolExecutor(nThreads, nThreads,                                   5000L, TimeUnit.MILLISECONDS,                                   new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 

Anything better and you might want to consider a MQ like RabbitMQ or ActiveMQ as they have QoS technology.

like image 56
Adam Gent Avatar answered Sep 18 '22 06:09

Adam Gent


You can call ThreadPoolExecutor.getQueue().size() to find out the size of the waiting queue. You can take an action if the queue is too long. I suggest running the task in the current thread if the queue is too long to slow down the producer (if that is appropriate).

like image 44
Peter Lawrey Avatar answered Sep 18 '22 06:09

Peter Lawrey