I have several workers, that use ArrayBlockingQueue.
Every worker takes one object from queue, process it, and in result can get several objects, that will be put into queue for further processing. So, worker = producer + consumer.
Worker:
public class Worker implements Runnable
{
private BlockingQueue<String> processQueue = null;
public Worker(BlockingQueue<String> processQueue)
{
this.processQueue = processQueue;
}
public void run()
{
try
{
do
{
String item = this.processQueue.take();
ArrayList<String> resultItems = this.processItem(item);
for(String resultItem : resultItems)
{
this.processQueue.put(resultItem);
}
}
while(true);
}
catch(Exception)
{
...
}
}
private ArrayList<String> processItem(String item) throws Exception
{
...
}
}
Main:
public class Test
{
public static void main(String[] args) throws Exception
{
new Test().run();
}
private void run() throws Exception
{
BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
processQueue.put("lalala");
Executor service = Executors.newFixedThreadPool(100);
for(int i=0; i<100; ++i)
{
service.execute(new Worker(processQueue));
}
}
}
Whats is the best way to stop workers, when there is no more work ?
First, what I have in mind, is to check periodically how many items in queue and how many items are currently in process. If both are equal to zero, then do something like "shutdownNow()" on ExecutorService. But I am not sure this is the best way.
The producer and consumer must agree upon an object (or an attribute in the object) that represents end of input. Then the producer sets that attribute in the last packet, and the consumer stops consuming it.
You can use wait, notify, and notifyAll methods to communicate between threads in Java. For example, if you have two threads running in your programs like Producer and Consumer then the producer thread can communicate to the consumer that it can start consuming now because there are items to consume in the queue.
The solution of Producer-Consumer Problem using Semaphore Binary Semaphore: In Binary Semaphore, only two processes can compete to enter into its CRITICAL SECTION at any point in time, apart from this the condition of mutual exclusion is also preserved.
In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.
If there's no more work to do, put a message into the queue saying so and have the workers shut themselves down at their own convenience. This is a good way to prevent data corruption.
If you need to notify another thread that all the workers have gone home, you can use a CountDownLatch
to do so.
Sounds like you have your solution--use a separate in-progress queue, the size of which will be the number of items currently being processed. If you use the convention that accesses to either queue is in synchronized(theArrayBlockingQueue)
blocks then all should be well. In particular, when moving an item to the processing state, remove it from theArrayBlockingQueue and add it to the processingQueue within the same synchronized block.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With