I'd like to create some sort of Producer/Consumer
threading app. But I'm not sure what the best way to implement a queue between the two.
So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.
Main class-
public class SomeApp { private Consumer consumer; private Producer producer; public static void main (String args[]) { consumer = new Consumer(); producer = new Producer(); } }
Consumer class-
public class Consumer implements Runnable { public Consumer() { Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = QueueHandler.dequeue(); //do some stuff with the object } } }
Producer class-
public class Producer implements Runnable { public Producer() { Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object QueueHandler.enqueue(new Object()); } } }
Queue class-
public class QueueHandler { //This Queue class is a thread safe (written in house) class public static Queue<Object> readQ = new Queue<Object>(100); public static void enqueue(Object object) { //do some stuff readQ.add(object); } public static Object dequeue() { //do some stuff return readQ.get(); } }
OR
Main class-
public class SomeApp { Queue<Object> readQ; private Consumer consumer; private Producer producer; public static void main (String args[]) { readQ = new Queue<Object>(100); consumer = new Consumer(readQ); producer = new Producer(readQ); } }
Consumer class-
public class Consumer implements Runnable { Queue<Object> queue; public Consumer(Queue<Object> readQ) { queue = readQ; Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = queue.dequeue(); //do some stuff with the object } } }
Producer class-
public class Producer implements Runnable { Queue<Object> queue; public Producer(Queue<Object> readQ) { queue = readQ; Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object queue.enqueue(new Object()); } } }
Queue class-
//the extended Queue class is a thread safe (written in house) class public class QueueHandler extends Queue<Object> { public QueueHandler(int size) { super(size); //All I'm thinking about now is McDonalds. } public void enqueue(Object object) { //do some stuff readQ.add(); } public Object dequeue() { //do some stuff return readQ.get(); } }
And go!
The producer consumer pattern is a concurrency design pattern where one or more producer threads produce objects which are queued up, and then consumed by one or more consumer threads. The objects enqueued often represent some work that needs to be done.
The producer and consumer problem is one of the small collection of standard, well-known problems in concurrent programming. A finite-size buffer and two classes of threads, producers and consumers, put items into the buffer (producers) and take items out of the buffer (consumers).
Producer must ensure that no element should be added when buffer is full, it should call wait() until consumer consume some data and notify to the producer thread AND consumer must ensure that it should not try to remove item from buffer when it is already empty, it should call wait() which simply waits until producer ...
The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e. removing it from the buffer), one piece at a time.
Java 5+ has all the tools you need for this kind of thing. You will want to:
ExecutorService
;ExecutorService
;BlockingQueue
.I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:
final ExecutorService producers = Executors.newFixedThreadPool(100); final ExecutorService consumers = Executors.newFixedThreadPool(100); while (/* has more work */) { producers.submit(...); } producers.shutdown(); producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); consumers.shutdown(); consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
So the producers
submit directly to consumers
.
OK, as others note, the best thing to do is to use java.util.concurrent
package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.
As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.
Leaving that aside, the second implementation seem better. You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.
If you want to go ahead with your own implementation (for learning purpose I guess?), supply a start()
method at least. You should construct the object (you can instantiate the Thread
object), and then call start()
to start the thread.
Edit: ExecutorService
have their own queue so this can be confusing.. Here's something to get you started.
public class Main { public static void main(String[] args) { //The numbers are just silly tune parameters. Refer to the API. //The important thing is, we are passing a bounded queue. ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); //No need to bound the queue for this executor. //Use utility method instead of the complicated Constructor. ExecutorService producer = Executors.newSingleThreadExecutor(); Runnable produce = new Produce(consumer); producer.submit(produce); } } class Produce implements Runnable { private final ExecutorService consumer; public Produce(ExecutorService consumer) { this.consumer = consumer; } @Override public void run() { Pancake cake = Pan.cook(); Runnable consume = new Consume(cake); consumer.submit(consume); } } class Consume implements Runnable { private final Pancake cake; public Consume(Pancake cake){ this.cake = cake; } @Override public void run() { cake.eat(); } }
Further EDIT: For producer, instead of while(true)
, you can do something like:
@Override public void run(){ while(!Thread.currentThread().isInterrupted()){ //do stuff } }
This way you can shutdown the executor by calling .shutdownNow()
. If you'd use while(true)
, it won't shutdown.
Also note that the Producer
is still vulnerable to RuntimeExceptions
(i.e. one RuntimeException
will halt the processing)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With