How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

I've searched a lot but could not find a solutuion to my problem.

I have my own class, BaseTask, that uses a ThreadPoolExecutor to handle tasks. I want task prioritization, but when I try to use a PriorityBlockingQueue I get ClassCastException because the ThreadPoolExecutor wraps my Tasks into a FutureTask object.

This obviously makes sense because the FutureTask does not implement Comparable, but how would I go on to solve the priority problem? I've read that you could override newTaskFor() in ThreadPoolExecutor, but I can not seem to find this method at all...?

Any suggestions would be much appreciated!

Some code to help:

In my BaseTask class I have

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();  private static final ThreadFactory sThreadFactory = new ThreadFactory() {     private final AtomicInteger mCount = new AtomicInteger(1);      public Thread newThread(Runnable r) {         return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());     } };  private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(     1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);  private final BaseFutureTask<Result> mFuture;  public BaseTask(int priority) {     mFuture = new BaseFutureTask<Result>(mWorker, priority); }  public final BaseTask<Params, Progress, Result> execute(Params... params) {      /* Some unimportant code here */      sExecutor.execute(mFuture); } 

In BaseFutureTask class

@Override public int compareTo(BaseFutureTask another) {     long diff = this.priority - another.priority;      return Long.signum(diff); } 

In BaseThreadPoolExecutor class i override the 3 submit methods... The constructor in this class gets called, but none of the submit methods

2 Answers

public class ExecutorPriority {  public static void main(String[] args) {      PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());      Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);     exe.execute(new RunWithPriority(2) {          @Override         public void run() {              System.out.println(this.getPriority() + " started");             try {                 Thread.sleep(3000);             } catch (InterruptedException ex) {                 Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);             }             System.out.println(this.getPriority() + " finished");         }     });     exe.execute(new RunWithPriority(10) {          @Override         public void run() {             System.out.println(this.getPriority() + " started");             try {                 Thread.sleep(3000);             } catch (InterruptedException ex) {                 Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);             }             System.out.println(this.getPriority() + " finished");         }     });  }  private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {      @Override     public int compare(T o1, T o2) {         return o1.getPriority().compareTo(o2.getPriority());     } } 


as you can guess RunWithPriority is an abstract class that is Runnable and has a Integer priority field

You can use these helper classes:

public class PriorityFuture<T> implements RunnableFuture<T> {      private RunnableFuture<T> src;     private int priority;      public PriorityFuture(RunnableFuture<T> other, int priority) {         this.src = other;         this.priority = priority;     }      public int getPriority() {         return priority;     }      public boolean cancel(boolean mayInterruptIfRunning) {         return src.cancel(mayInterruptIfRunning);     }      public boolean isCancelled() {         return src.isCancelled();     }      public boolean isDone() {         return src.isDone();     }      public T get() throws InterruptedException, ExecutionException {         return src.get();     }      public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {         return src.get();     }      public void run() {         src.run();     }      public static Comparator<Runnable> COMP = new Comparator<Runnable>() {         public int compare(Runnable o1, Runnable o2) {             if (o1 == null && o2 == null)                 return 0;             else if (o1 == null)                 return -1;             else if (o2 == null)                 return 1;             else {                 int p1 = ((PriorityFuture<?>) o1).getPriority();                 int p2 = ((PriorityFuture<?>) o2).getPriority();                  return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);             }         }     }; } 


public interface PriorityCallable<T> extends Callable<T> {      int getPriority();  } 

AND this helper method:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {     return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,             new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {          protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {             RunnableFuture<T> newTaskFor = super.newTaskFor(callable);             return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());         }     }; } 

AND then use it like this:

class LenthyJob implements PriorityCallable<Long> {     private int priority;      public LenthyJob(int priority) {         this.priority = priority;     }      public Long call() throws Exception {         System.out.println("Executing: " + priority);         long num = 1000000;         for (int i = 0; i < 1000000; i++) {             num *= Math.random() * 1000;             num /= Math.random() * 1000;             if (num == 0)                 num = 1000000;         }         return num;     }      public int getPriority() {         return priority;     } }  public class TestPQ {      public static void main(String[] args) throws InterruptedException, ExecutionException {         ThreadPoolExecutor exec = getPriorityExecutor(2);          for (int i = 0; i < 20; i++) {             int priority = (int) (Math.random() * 100);             System.out.println("Scheduling: " + priority);             LenthyJob job = new LenthyJob(priority);             exec.submit(job);         }     } } 
