Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why ThreadPoolExecutor has BlockingQueue as its argument?

Tags:

I have tried creating and executing ThreadPoolExecutor with

int poolSize = 2; int maxPoolSize = 3; ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2); 

If i try 7th,8th... task continuously

  threadPool.execute(task);   

after the queue reached maximum size
it is start throwing "RejectedExecutionException". Means i lost of adding those tasks.

Here then what is the role of BlockingQueue if it is missing the tasks? Means why it is not waiting?

From the definition of BlockingQueue

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.


Why cant we go for linkedlist (normal queue implementation instead of blocking queue)?

like image 281
Kanagavelu Sugumar Avatar asked Sep 26 '11 14:09

Kanagavelu Sugumar


People also ask

Why do we use BlockingQueue?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

What is role of a BlockingQueue in Executor framework?

A BlockingQueue may be used to transfer and hold the tasks to be executed by the thread pool. Blocking queues helps in many ways: If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

What is a ThreadPoolExecutor and why is it necessary?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

How is BlockingQueue implemented?

BlockingQueue implementations are designed to be used primarily for producer-consumer queues but additionally support the Collection interface. Suppose, we have an empty queue and a thread invokes it take() method to retrieve an element then the thread gets blocked until there is an element available for retrieval.


1 Answers

The problem occurs because you're task queue is too small and this is indicated by the documentation of the execute method:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

So the first problem is that you're setting your queue size to a very small number:

int poolSize = 2; int maxPoolSize = 3; ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2); 

And then you state "If [I] try 7th, 8th... task" then you would get a RejectedExecutionException because you're past the capacity of the queue. There are two ways to resolve your problem (I would recommend doing both):

  1. Increase the size of the queue.
  2. Catch the exception and re-try adding the task.

You should have something along the lines of this:

public void ExecuteTask(MyRunnableTask task) {     bool taskAdded = false;     while(!taskAdded) {         try {             executor.execute(task);             taskAdded = true;         } catch (RejectedExecutionException ex) {             taskAdded = false;         }     }    } 

Now, to address your other questions...

Here then what is the role of BlockingQueue if it is missing the tasks?

The role of the BlockingQueue is to complete the Producer/Consumer pattern and if it's large enough, then you shouldn't see the issues you're encountering. As I mentioned above, you need to increase the queue size and catch the exception then retry executing the task.

Why cant we go for linkedlist?

A linked list is neither thread safe, nor is it blocking. The Producer/Consumer pattern tends to work best with a blocking queue.

Update

Please don't be offended by the following statements, I'm intentionally using more stringent language in order to put emphasis on the fact that your first assumption should never be that there is something wrong with the library you're using (unless you wrote the library yourself and you know that there is a specific problem in it)!

So let's put this concern to rest right now: neither the ThreadPoolExecutor nor the Java library are the problem here. It's entirely your (mis)use of the library that's causing the problem. Javmex has a great tutorial explaining the exact situation you're seeing.

There could be several reasons why you're filling up the queue faster than you're emptying it:

  1. The thread that's adding tasks for executing is adding them too fast.
  2. The tasks are taking too long to execute.
  3. Your queue is too small.
  4. Any combination of the above 3.

There are a bunch of other reasons too, but I think the above would be the most common.

I would give you a simple solution with an unbounded queue, but it would NOT resolve your (mis)use of the library. So before we go blaming the Java library, let's see a concise example that demonstrates the exact problem you're encountering.

Update 2.0

Here are a couple of other questions addressing the specific problem:

  1. ThreadPoolExecutor Block When Queue Is Full?
  2. How to make ThreadPoolExecutor's submit() method block if it is saturated?
like image 196
Kiril Avatar answered Oct 12 '22 18:10

Kiril