I'm trying to understand the behavior of queues in ThreadPoolExecutor
. In the below program, when I use LinkedBlockingQueue
, I can submit only one task to the thread pool at a time. But if I replace the LinkedBlockingQueue
with SynchronousQueue
, I could submit all the 5 tasks to the pool at an instant. How SynchronousQueue
differs from LinkedBlockingQueue
in this case ?
Java program :
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Sample {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Runnable> threadPoolQueue = new LinkedBlockingQueue<>();
// SynchronousQueue<Runnable> threadPoolQueue = new SynchronousQueue<>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ThreadPoolExecutor tpe = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, threadPoolQueue, threadFactory);
Runnable np;
for (int i = 1; i <= 5; i++) {
np = new SampleWorker("ThreadPoolWorker " + i);
tpe.submit(np);
}
System.out.println(tpe.getCorePoolSize());
System.out.println(tpe.getPoolSize());
System.out.println(tpe.getActiveCount());
tpe.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
tpe.shutdown();
System.out.println("Main task finished");
}
}
class SampleWorker implements Runnable {
private String workerName;
SampleWorker(String tName) {
workerName = tName;
}
@Override
public void run() {
try {
for (int i = 1; i <= 10; i++) {
Thread.sleep(3000);
System.out.println(this.workerName);
}
System.out.println(this.workerName + " finished");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
SynchronousQueue is a special blocking queue with no internal capacity. It helps in exchange data or information between threads in a thread-safe manner.
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. unit - the time unit for the keepAliveTime argument. workQueue - the queue to use for holding tasks before the are executed.
CorePoolSize: The ThreadPoolExecutor has an attribute corePoolSize that determines how many threads it will start until new threads are only started when the queue is full. MaximumPoolSize: This attribute determines how many threads are started at the maximum. You can set this to Integer.
ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.
When you submit a task to ThreadPoolExecutor
, it works like this :
if (numberOfWorkingThreads < corePoolSize) {
startNewThreadAndRunTask();
} else if (workQueue.offer(task)) {
if (numberOfWorkingThreads == 0) {
startNewThreadAndRunTask();
}
} else if (numberOfWorkingThreads < maxPoolSize)
startNewThreadAndRunTask();
} else {
rejectTask();
}
LinkedBlockingQueue
with no initial value,
workQueue.offer(task)
will always success, resulting only one
thread started.SynchronousQueue.offer(task)
, it successes only when
another thread is waiting to receive it. Since there is no waiting
thread, false
will be returned and new thread is created every
time.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With