Why, oh why doesn't java.util.concurrent
provide a queue length indicators for its ExecutorService
s? Recently I found myself doing something like this:
ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...
public void addTaskToQueue(Runnable runnable) {
if (queueLength.get() < MAX_QUEUE_LENGTH) {
queueLength.incrementAndGet(); // Increment queue when submitting task.
queue.submit(new Runnable() {
public void run() {
runnable.run();
queueLength.decrementAndGet(); // Decrement queue when task done.
}
});
} else {
// Trigger error: too long queue
}
}
Which works ok, but... I think this really should be implemented as a part of the ExecutorService
. It's dumb and error prone to carry around a counter separated from the actual queue, whose length the counter is supposed to indicate (reminds me of C arrays). But, ExecutorService
s are obtained via static factory methods, so there's no way to simply extend the otherwise excellent single thread executor and add a queue counter. So what should I do:
There is a more direct way:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();
Although you might consider not to use the convenience create methods of Executor, but rather create the executor directly to get rid of the cast and thus be sure that the executor will always actually be a ThreadPoolExecutor
, even if the implementation of Executors.newSingleThreadExecutor
would change some day.
ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
This is directly copied from Executors.newSingleThreadExecutor
in JDK 1.6. The LinkedBlockingQueue
that is passed to the constructor is actually the very object that you will get back from getQueue
.
While you can check the queue size directly. Another way of dealing with a queue that's getting too long is making the internal queue bounded.
public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true));
}
This will cause RejectedExecutionExceptions (see this) when you exceed the limit.
If you want to avoid the exception the calling thread can be hijacked to execute the function. See this SO question for a solution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With