Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement an ExecutorService to execute tasks on a rotation-basis?

I'm using java.util.concurrent.ExecutorService with fixed thread pool to execute list of tasks. My list of tasks will typically be around 80 - 150 and I've limited the number of threads running at any time to 10 as shown below:

ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}

My use case demands that even the completed task should be re-submitted again to the ExecutorService but it should be executed/taken again only when all the already submitted tasks are serviced/completed. That is basically, the tasks submitted should be executed on a rotation-basis. Hence, there will not be either threadPoolService.shutdown() or threadPoolService.shutdownNow() call in this case.

My question is, how do I implement ExecutorService servicing rotation-basis tasks?

like image 909
Gnanam Avatar asked Apr 27 '12 07:04

Gnanam


2 Answers

ThreadPoolExecutor provides an extension point for afterExecution where you can put the job back at the end of the queue.

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

You'll have to do a little more work of course to instantiate it yourself without the help of ExecutorService's handy factory method, but the constructors are simple enough to grok.

like image 99
Affe Avatar answered Sep 22 '22 11:09

Affe


The answer is more related to the implementation of the work queue used for the instance of ExecutorService. So, I'd suggest:

  1. First choose an implementation of java.util.concurrent.BlockingQueue (an example) that provides a circular queue functionality. NOTE, the reason BlockingQueue has been chosen is that to wait until the next task is provided to queue; so, in case of circular + blocking queue, you should be careful how to provide the same behavior and functionality.

  2. Instead of using Executors.new... to create a new ThreadPoolExecutor use a direct constructor such as

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

This way, unless you command the executor to shutdown, it will try to fetch the next task from the queue for execution from its work queue which is a circular container for tasks.

like image 21
nobeh Avatar answered Sep 20 '22 11:09

nobeh