I've searched a lot but could not find a solutuion to my problem.
I have my own class, BaseTask
, that uses a ThreadPoolExecutor
to handle tasks. I want task prioritization, but when I try to use a PriorityBlockingQueue
I get ClassCastException
because the ThreadPoolExecutor
wraps my Tasks into a FutureTask
object.
This obviously makes sense because the FutureTask
does not implement Comparable
, but how would I go on to solve the priority problem? I've read that you could override newTaskFor()
in ThreadPoolExecutor
, but I can not seem to find this method at all...?
Any suggestions would be much appreciated!
Some code to help:
In my BaseTask
class I have
private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>(); private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor( 1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory); private final BaseFutureTask<Result> mFuture; public BaseTask(int priority) { mFuture = new BaseFutureTask<Result>(mWorker, priority); } public final BaseTask<Params, Progress, Result> execute(Params... params) { /* Some unimportant code here */ sExecutor.execute(mFuture); }
In BaseFutureTask
class
@Override public int compareTo(BaseFutureTask another) { long diff = this.priority - another.priority; return Long.signum(diff); }
In BaseThreadPoolExecutor
class i override the 3 submit
methods... The constructor in this class gets called, but none of the submit
methods
public class ExecutorPriority { public static void main(String[] args) { PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority()); Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq); exe.execute(new RunWithPriority(2) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); exe.execute(new RunWithPriority(10) { @Override public void run() { System.out.println(this.getPriority() + " started"); try { Thread.sleep(3000); } catch (InterruptedException ex) { Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); } System.out.println(this.getPriority() + " finished"); } }); } private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> { @Override public int compare(T o1, T o2) { return o1.getPriority().compareTo(o2.getPriority()); } }
}
as you can guess RunWithPriority is an abstract class that is Runnable and has a Integer priority field
You can use these helper classes:
public class PriorityFuture<T> implements RunnableFuture<T> { private RunnableFuture<T> src; private int priority; public PriorityFuture(RunnableFuture<T> other, int priority) { this.src = other; this.priority = priority; } public int getPriority() { return priority; } public boolean cancel(boolean mayInterruptIfRunning) { return src.cancel(mayInterruptIfRunning); } public boolean isCancelled() { return src.isCancelled(); } public boolean isDone() { return src.isDone(); } public T get() throws InterruptedException, ExecutionException { return src.get(); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return src.get(); } public void run() { src.run(); } public static Comparator<Runnable> COMP = new Comparator<Runnable>() { public int compare(Runnable o1, Runnable o2) { if (o1 == null && o2 == null) return 0; else if (o1 == null) return -1; else if (o2 == null) return 1; else { int p1 = ((PriorityFuture<?>) o1).getPriority(); int p2 = ((PriorityFuture<?>) o2).getPriority(); return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); } } }; }
AND
public interface PriorityCallable<T> extends Callable<T> { int getPriority(); }
AND this helper method:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { RunnableFuture<T> newTaskFor = super.newTaskFor(callable); return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); } }; }
AND then use it like this:
class LenthyJob implements PriorityCallable<Long> { private int priority; public LenthyJob(int priority) { this.priority = priority; } public Long call() throws Exception { System.out.println("Executing: " + priority); long num = 1000000; for (int i = 0; i < 1000000; i++) { num *= Math.random() * 1000; num /= Math.random() * 1000; if (num == 0) num = 1000000; } return num; } public int getPriority() { return priority; } } public class TestPQ { public static void main(String[] args) throws InterruptedException, ExecutionException { ThreadPoolExecutor exec = getPriorityExecutor(2); for (int i = 0; i < 20; i++) { int priority = (int) (Math.random() * 100); System.out.println("Scheduling: " + priority); LenthyJob job = new LenthyJob(priority); exec.submit(job); } } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With