Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor's getActiveCount()

I have a ThreadPoolExecutor that seems to be lying to me when I call getActiveCount(). I haven't done a lot of multithreaded programming however, so perhaps I'm doing something incorrectly.

Here's my TPE

@Override
public void afterPropertiesSet() throws Exception {

    BlockingQueue<Runnable> workQueue;
    int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
    if (maxQueueLength == 0) {
        workQueue = new LinkedBlockingQueue<Runnable>();
    } else {
        workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
    }

    pool = new ThreadPoolExecutor(
                   threadPoolConfiguration.getCorePoolSize(),
                   threadPoolConfiguration.getMaximumPoolSize(),
                   threadPoolConfiguration.getKeepAliveTime(),
                   TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
                   workQueue,
                   // Default thread factory creates normal-priority,
                   // non-daemon threads.
                   Executors.defaultThreadFactory(),
                   // Run any rejected task directly in the calling thread.
                   // In this way no records will be lost due to rejection
                   // however, no records will be added to the workQueue
                   // while the calling thread is processing a Task, so set
                   // your queue-size appropriately.
                   //
                   // This also means MaxThreadCount+1 tasks may run
                   // concurrently. If you REALLY want a max of MaxThreadCount
                   // threads don't use this.
                   new ThreadPoolExecutor.CallerRunsPolicy());
}

In this class I also have a DAO that I pass into my Runnable (FooWorker), like so:

@Override
public void addTask(FooRecord record) {
    if (pool == null) {
        throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
    }
    pool.execute(new FooWorker(context, calculator, dao, record));
}

FooWorker runs record (the only non-singleton) through a state machine via calculator then sends the transitions to the database via dao, like so:

public void run() {
    calculator.calculate(record);
    dao.save(record);
}

Once my main thread is done creating new tasks I try and wait to make sure all threads finished successfully:

while (pool.getActiveCount() > 0) {
    recordHandler.awaitTermination(terminationTimeout, 
                                   terminationTimeoutUnit);
}

What I'm seeing from output logs (which are presumably unreliable due to the threading) is that getActiveCount() is returning zero too early, and the while() loop is exiting while my last threads are still printing output from calculator.

Note I've also tried calling pool.shutdown() then using awaitTermination but then the next time my job runs the pool is still shut down.

My only guess is that inside a thread, when I send data into the dao (since it's a singleton created by Spring in the main thread...), java is considering the thread inactive since (I assume) it's processing in/waiting on the main thread.

Intuitively, based only on what I'm seeing, that's my guess. But... Is that really what's happening? Is there a way to "do it right" without putting a manual incremented variable at the top of run() and a decremented at the end to track the number of threads?

If the answer is "don't pass in the dao", then wouldn't I have to "new" a DAO for every thread? My process is already a (beautiful, efficient) beast, but that would really suck.

like image 716
inanutshellus Avatar asked Sep 01 '11 14:09

inanutshellus


People also ask

What is ThreadPoolExecutor in Java?

java. util. concurrent. ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

What happens when you call shutdownNow () on a ThreadPoolExecutor?

shutdownNow. Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained (removed) from the task queue upon return from this method. This method does not wait for actively executing tasks to terminate.

Why do we use ThreadPoolExecutor?

Use the ThreadPoolExecutor class when you need to execute tasks that may or may not take arguments and may or may not return a result once the tasks are complete. Use the ThreadPoolExecutor class when you need to execute different types of ad hoc tasks, such as calling different target task functions.

What is the difference between ThreadPoolExecutor and ExecutorService?

On the other hand, ExecutorService is an extension of the Executor interface and provides a facility for returning a Future object and terminate, or shut down the thread pool. Once the shutdown is called, the thread pool will not accept new tasks but complete any pending task.


2 Answers

As the JavaDoc of getActiveCount states, it's an approximate value: you should not base any major business logic decisions on this.

If you want to wait for all scheduled tasks to complete, then you should simply use

pool.shutdown();
pool.awaitTermination(terminationTimeout, terminationTimeoutUnit);

If you need to wait for a specific task to finish, you should use submit() instead of execute() and then check the Future object for completion (either using isDone() if you want to do it non-blocking or by simply calling get() which blocks until the task is done).

like image 83
Joachim Sauer Avatar answered Sep 22 '22 22:09

Joachim Sauer


The documentation suggests that the method getActiveCount() on ThreadPoolExecutor is not an exact number:

getActiveCount

public int getActiveCount()

Returns the approximate number of threads that are actively executing tasks.

Returns: the number of threads

Personally, when I am doing multithreaded work such as this, I use a variable that I increment as I add tasks, and decrement as I grab their output.

like image 33
nicholas.hauschild Avatar answered Sep 23 '22 22:09

nicholas.hauschild