My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves it and processes it. It could also be implemented by the producer directly submitting to the consumer (Producer threads submitting to Consumer's executor service directly).
Now, I've been looking at the Executors class that provides some common implementations of thread pools. The method newFixedThreadPool, according to the spec, "reuses a fixed number of threads operating off a shared unbounded queue". Which queue are they talking about here?
If the Producer directly submits a task to a consumer, is it the internal queue of the ExecutorService that contains the list of Runnables?
Or is it the intermediate queue, in case the producer submits to a shared queue?
May be I'm missing the whole point, but would someone please clarify?
In terms of resources, the newFixedThreadPool will keep all the threads running until they are explicitly terminated. In the newCachedThreadPool Threads that have not been used for sixty seconds are terminated and removed from the cache. Given this, the resource consumption will depend very much in the situation.
newFixedThreadPool. public static ExecutorService newFixedThreadPool(int nThreads) Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks.
Here is a code example: ExecutorService executorService = Executors. newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables. add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.
Executor just executes stuff you give it. ExecutorService adds startup, shutdown, and the ability to wait for and look at the status of jobs you've submitted for execution on top of Executor (which it extends). This is a perfect answer, short and clear.
You are right, ExecutorService
is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnable
s (FutureTask
to be precise) holding tasks you submit()
.
All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit()
a task, exactly one thread will pick it up and run it. Of course submit()
is not waiting for thread in the pool to finish processing.
On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.
public class Producer extends Thread {
static List<String> list = new ArrayList<String>();
public static void main(String[] args) {
ScheduledExecutorService executor = Executors
.newScheduledThreadPool(12);
int initialDelay = 5;
int pollingFrequency = 5;
Producer producer = new Producer();
@SuppressWarnings({ "rawtypes", "unused" })
ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(
producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);
for (int i = 0; i < 3; i++) {
Consumer consumer = new Consumer();
@SuppressWarnings({ "rawtypes", "unused" })
ScheduledFuture schedFutureConsumer = executor
.scheduleWithFixedDelay(consumer, initialDelay,
pollingFrequency, TimeUnit.SECONDS);
}
}
@Override
public void run() {
list.add("object added to list is " + System.currentTimeMillis());
///adding in list become slow also because of synchronized behavior
}
}
class Consumer extends Thread {
@Override
public void run() {
getObjectFromList();
}
private void getObjectFromList() {
synchronized (Producer.list) {
if (Producer.list.size() > 0) {
System.out.println("Object name removed by "
+ Thread.currentThread().getName() + "is "
+ Producer.list.get(0));
Producer.list.remove(Producer.list.get(0));
}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With