Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fixed Thread Pool threads blocking, when enough tasks submitted

I have a process which needs to compute many small tasks in parallel, and then process the results in the natural order of the tasks. To do this, I have the following setup:

A simple ExecutorService, and a blocking queue I will use to keep the Future objects returned when a Callable is submitted to the executor:

ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

Some debugging code to count the number of submitted and number of processed tasks and write them out on a regular basis (note that processed is incremented at the end of the task code itself):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
}, 60 * 1000, 60 * 1000);

A thread which takes tasks from a queue (which is actually a generator) and submits them to the executor, putting the resulting Future in the futures queue (this is how I make sure I do not submit too many tasks an run out of memory):

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();

The current thread then take-s finished tasks off the futures queue and handles the result:

while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

When I run this on a server with 8 cores (note that the code currently uses 15 threads) the CPU utilization peaks at about 60% only. I see my debug output like this:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

A thread dump shows that many of the thread pool threads are blocking like this:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

My interpretation of the debug output is that at any given point in time, I have at least several hundred tasks which have been submitted to the executor service, but have not been processed (I can also confirm in the stack trace that the SubmitTasks thread is blocked on LinkedBlockingQueue.put). Yet, the stack trace (and the server utilization stats) are showing me that the Executor Service is blocked on LinkedBlockingQueue.take (what I assume is the internal task queue being empty).

What am I reading wrong?

like image 354
xpa1492 Avatar asked Dec 10 '15 07:12

xpa1492


Video Answer


1 Answers

2.5 years later, I saw this question has received some views and figured I will provide a follow-up.

After a number of changes and tests, I ended up grouping tasks into groups of 10000 each (that is, each Future was responsible for a group of 10000 MyTask tasks, instead of just 1). This way the ExecutorService executed about 10-20 tasks per second (instead of the quite high 100000-200000 I was "asking" it to do. This approach increased the speed significantly and resulted in a full 100% CPU utilization.

With the benefit of hindsight, it seems "unreasonable" to execute more than 100k tasks per second. My read is that too much time was spent in concurrency management / locking overhead and context switching (a conjecture).

like image 151
xpa1492 Avatar answered Nov 23 '22 12:11

xpa1492