Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I implement task prioritization using an ExecutorService in Java 5?

I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callable or a Runnable is not important to me).

Now, superficially it looks like I could use a PriorityBlockingQueue as the task queue in my ThreadPoolExecutor, but that queue contains Runnable objects, which may or may not be the Runnable tasks I've submitted to it. Moreover, if I've submitted Callable tasks, it's not clear how this would ever map.

Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.

(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)

like image 537
Chris R Avatar asked Apr 30 '09 14:04

Chris R


People also ask

Can you give an example for ExecutorService?

Here is an example of executing a Runnable with an ExecutorService : ExecutorService executorService = Executors. newSingleThreadExecutor(); executorService. execute(new Runnable() { public void run() { System.


2 Answers

I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.

Using a PriorityBlockingQueue as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue must be generically instantiated to contain Runnable instances, and it is impossible to call compareTo (or similiar) on a Runnable interface.

Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator()); 

Now, a peek at CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {      @Override     public int compare(MyType first, MyType second) {          return comparison;     }  } 

Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor as so:

@Override protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {     //Override the default FutureTask creation and retrofit it with     //a custom task. This is done so that prioritization can be accomplished.     return new CustomFutureTask(c); } 

Where c is the Callable task that we're trying to execute. Now, let's have a peek at CustomFutureTask:

public class CustomFutureTask extends FutureTask {      private CustomTask task;      public CustomFutureTask(Callable callable) {         super(callable);         this.task = (CustomTask) callable;     }      public CustomTask getTask() {         return task;     }  } 

Notice the getTask method. We're gonna use that later to grab the original task out of this CustomFutureTask that we've created.

And finally, let's modify the original task that we were trying to execute:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {      private final MyType myType;      public CustomTask(MyType myType) {         this.myType = myType;     }      @Override     public MyType call() {         //Do some things, return something for FutureTask implementation of `call`.         return myType;     }      @Override     public int compareTo(MyType task2) {         return new CustomTaskComparator().compare(this.myType, task2.myType);     }  } 

You can see that we implement Comparable in the task to delegate to the actual Comparator for MyType.

And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!

like image 98
Mike Avatar answered Sep 20 '22 05:09

Mike


At first blush it would seem you could define an interface for your tasks that extends Runnable or Callable<T> and Comparable. Then wrap a ThreadPoolExecutor with a PriorityBlockingQueue as the queue, and only accept tasks that implement your interface.

Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit() methods. Refer to AbstractExecutorService to see what the default ones look like; all they do is wrap the Runnable or Callable in a FutureTask and execute() it. I'd probably do this by writing a wrapper class that implements ExecutorService and delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparator can get at it.

like image 28
Adam Jaskiewicz Avatar answered Sep 22 '22 05:09

Adam Jaskiewicz