I am using ExecutorService
for ease of concurrent multithreaded program. Take following code:
while(xxx) { ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); ... Future<..> ... = exService.submit(..); ... }
In my case the problem is that submit()
is not blocking if all NUMBER_THREADS
are occupied. The consequence is that the Task queue is getting flooded by many tasks. The consequence of this is, that shutting down the execution service with ExecutorService.shutdown()
takes ages (ExecutorService.isTerminated()
will be false for long time). Reason is that the task queue is still quite full.
For now my workaround is to work with semaphores to disallow to have to many entries inside the task queue of ExecutorService
:
... Semaphore semaphore=new Semaphore(NUMBER_THREADS); while(xxx) { ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); ... semaphore.aquire(); // internally the task calls a finish callback, which invokes semaphore.release() // -> now another task is added to queue Future<..> ... = exService.submit(..); ... }
I am sure there is a better more encapsulated solution?
Right, this ExecutorService blocks tasks on submission without blocking caller thread. Job just getting submitted and will be processed asynchronously when there will be enough system resources for it.
Here we have a blockingQueue that has a capacity equal to 10. It means that when a producer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.
You can cancel the task submitted to ExecutorService by simply calling the cancel method on the future submitted when the task is submitted.
The trick is to use a fixed queue size and:
new ThreadPoolExecutor.CallerRunsPolicy()
I also recommend using Guava's ListeningExecutorService. Here is an example consumer/producer queues.
private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); }
Anything better and you might want to consider a MQ like RabbitMQ or ActiveMQ as they have QoS technology.
You can call ThreadPoolExecutor.getQueue().size()
to find out the size of the waiting queue. You can take an action if the queue is too long. I suggest running the task in the current thread if the queue is too long to slow down the producer (if that is appropriate).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With