I'm using java.util.concurrent.ExecutorService with fixed thread pool to execute list of tasks. My list of tasks will typically be around 80 - 150 and I've limited the number of threads running at any time to 10 as shown below:
ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
for ( Runnable task : myTasks )
{
threadPoolService.submit(task);
}
My use case demands that even the completed task should be re-submitted again to the ExecutorService but it should be executed/taken again only when all the already submitted tasks are serviced/completed. That is basically, the tasks submitted should be executed on a rotation-basis. Hence, there will not be either threadPoolService.shutdown()
or threadPoolService.shutdownNow()
call in this case.
My question is, how do I implement ExecutorService servicing rotation-basis tasks?
ThreadPoolExecutor provides an extension point for afterExecution where you can put the job back at the end of the queue.
public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {
public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
this.submit(r);
}
}
You'll have to do a little more work of course to instantiate it yourself without the help of ExecutorService
's handy factory method, but the constructors are simple enough to grok.
The answer is more related to the implementation of the work queue used for the instance of ExecutorService
. So, I'd suggest:
First choose an implementation of java.util.concurrent.BlockingQueue
(an example) that provides a circular queue functionality. NOTE, the reason BlockingQueue
has been chosen is that to wait until the next task is provided to queue; so, in case of circular + blocking queue, you should be careful how to provide the same behavior and functionality.
Instead of using Executors.new...
to create a new ThreadPoolExecutor
use a direct constructor such as
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
This way, unless you command the executor to shutdown
, it will try to fetch the next task from the queue for execution from its work queue which is a circular container for tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With