Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer Consumer - Using Executors.newFixedThreadPool

My understanding of a Producer-Consumer pattern is that it could be implemented using a queue shared between the producer and the consumer. Producer submits work to a shared queue, consumer retrieves it and processes it. It could also be implemented by the producer directly submitting to the consumer (Producer threads submitting to Consumer's executor service directly). 

Now, I've been looking at the Executors class that provides some common implementations of thread pools. The method newFixedThreadPool, according to the spec, "reuses a fixed number of threads operating off a shared unbounded queue". Which queue are they talking about here? 

If the Producer directly submits a task to a consumer, is it the internal queue of the ExecutorService that contains the list of Runnables?

Or is it the intermediate queue, in case the producer submits to a shared queue? 

May be I'm missing the whole point, but would someone please clarify?

like image 269
Oxford Avatar asked Aug 11 '11 19:08

Oxford


People also ask

What is difference between newFixedThreadPool and newCachedThreadPool?

In terms of resources, the newFixedThreadPool will keep all the threads running until they are explicitly terminated. In the newCachedThreadPool Threads that have not been used for sixty seconds are terminated and removed from the cache. Given this, the resource consumption will depend very much in the situation.

What is executors newFixedThreadPool?

newFixedThreadPool. public static ExecutorService newFixedThreadPool(int nThreads) Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks.

Can you give an example for ExecutorService?

Here is a code example: ExecutorService executorService = Executors. newSingleThreadExecutor(); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables. add(new Callable<String>() { public String call() throws Exception { return "Task 1"; } }); callables.

What is the difference between Executor and ExecutorService?

Executor just executes stuff you give it. ExecutorService adds startup, shutdown, and the ability to wait for and look at the status of jobs you've submitted for execution on top of Executor (which it extends). This is a perfect answer, short and clear.


2 Answers

You are right, ExecutorService is not only a thread pool, but it is a full Producer-Consumer implementation. This internal queue is in fact a thread-safe queue of Runnables (FutureTask to be precise) holding tasks you submit().

All the threads in the pool are blocked on that queue, waiting for tasks to be executed. When you submit() a task, exactly one thread will pick it up and run it. Of course submit() is not waiting for thread in the pool to finish processing.

On the other hand if you submit a huge number of tasks (or long-running ones) you might end-up with all threads in the pool being occupied and some tasks waiting in the queue. Once any thread is done with its task, it will immediately pick the first one from the queue.

like image 124
Tomasz Nurkiewicz Avatar answered Sep 27 '22 18:09

Tomasz Nurkiewicz


public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
like image 23
ryan raina Avatar answered Sep 27 '22 18:09

ryan raina