I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callable
or a Runnable
is not important to me).
Now, superficially it looks like I could use a PriorityBlockingQueue
as the task queue in my ThreadPoolExecutor
, but that queue contains Runnable
objects, which may or may not be the Runnable
tasks I've submitted to it. Moreover, if I've submitted Callable
tasks, it's not clear how this would ever map.
Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.
(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)
Here is an example of executing a Runnable with an ExecutorService : ExecutorService executorService = Executors. newSingleThreadExecutor(); executorService. execute(new Runnable() { public void run() { System.
I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.
Using a PriorityBlockingQueue
as the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueue
must be generically instantiated to contain Runnable
instances, and it is impossible to call compareTo
(or similiar) on a Runnable
interface.
Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue
. The queue should further be given a custom Comparator to do proper in place sorting:
new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
Now, a peek at CustomTaskComparator
:
public class CustomTaskComparator implements Comparator<MyType> { @Override public int compare(MyType first, MyType second) { return comparison; } }
Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskFor
as so:
@Override protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { //Override the default FutureTask creation and retrofit it with //a custom task. This is done so that prioritization can be accomplished. return new CustomFutureTask(c); }
Where c
is the Callable
task that we're trying to execute. Now, let's have a peek at CustomFutureTask
:
public class CustomFutureTask extends FutureTask { private CustomTask task; public CustomFutureTask(Callable callable) { super(callable); this.task = (CustomTask) callable; } public CustomTask getTask() { return task; } }
Notice the getTask
method. We're gonna use that later to grab the original task out of this CustomFutureTask
that we've created.
And finally, let's modify the original task that we were trying to execute:
public class CustomTask implements Callable<MyType>, Comparable<CustomTask> { private final MyType myType; public CustomTask(MyType myType) { this.myType = myType; } @Override public MyType call() { //Do some things, return something for FutureTask implementation of `call`. return myType; } @Override public int compareTo(MyType task2) { return new CustomTaskComparator().compare(this.myType, task2.myType); } }
You can see that we implement Comparable
in the task to delegate to the actual Comparator
for MyType
.
And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!
At first blush it would seem you could define an interface for your tasks that extends Runnable
or Callable<T>
and Comparable
. Then wrap a ThreadPoolExecutor
with a PriorityBlockingQueue
as the queue, and only accept tasks that implement your interface.
Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor
, and override the submit()
methods. Refer to AbstractExecutorService
to see what the default ones look like; all they do is wrap the Runnable
or Callable
in a FutureTask
and execute()
it. I'd probably do this by writing a wrapper class that implements ExecutorService
and delegates to an anonymous inner ThreadPoolExecutor
. Wrap them in something that has your priority, so that your Comparator
can get at it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With