Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a Synchronized Buffer for Producer/Consumer pattern in Java

I implemented a buffer for the producer/consumer pattern, however, it seems that the Consumer never acquires the lock so Starvation occurs. I can't identify why this happens since both put() and take() seem to release the lock properly...

I know there is BlockingQueue and other nice implementations, but I want to implement this using wait() and notify() as an exercise.

public class ProducerConsumerRaw {

    public static void main(String[] args) {
        IntBuffer buffer = new IntBuffer(8);

        ConsumerRaw consumer = new ConsumerRaw(buffer);
        ProducerRaw producer = new ProducerRaw(buffer);

        Thread t1 = new Thread(consumer);
        Thread t2 = new Thread(producer);

        t1.start();
        t2.start();

    }
}

class ConsumerRaw implements Runnable{
    private final IntBuffer buffer;

    public ConsumerRaw(IntBuffer b){
        buffer = b;
    }

    public void run() {
        while(!buffer.isEmpty()) {
            int i = buffer.take();
            System.out.println("Consumer reads "+i); // this print may not be in the order
        }
    }
}


class ProducerRaw implements Runnable{
    private final IntBuffer buffer;

    ProducerRaw(IntBuffer b) {
        this.buffer = b;
    }

    public void run(){
        for (int i = 0; i < 20; i++) {
            int n = (int) (Math.random()*100);
            buffer.put(n);
            System.out.println("Producer puts "+n);
        }
    }
}


class IntBuffer{

    private final int[] storage;
    private volatile int end;
    private volatile int start;

    public IntBuffer(int size) {
        this.storage = new int[size];
        end = 0;
        start = 0;
    }


    public void put(int n) { // puts add the END
        synchronized(storage) {
            boolean full = (start == (end+storage.length+1)%storage.length);
            while(full){ // queue is full
                try {
                    storage.notifyAll(); 
                    storage.wait();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.storage[end] = n;
            end = incrementMod(end);
            storage.notifyAll();
        }
    }


    public int take(){

        synchronized(storage) {
            while (end == start) { // empty queue
                try {
                    storage.notifyAll(); // notify waiting producers
                    storage.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int index = start;
            start = incrementMod(start);
            storage.notifyAll(); // notify waiting producers
            return this.storage[index];

        }
    }

    private int incrementMod(int index) {
        synchronized (storage) {
            if (index == storage.length-1) return 0;
            else return index+1;
        }
    }

    public boolean isEmpty(){
        synchronized (storage) {
            return (start == end);
        }
    }

}
like image 473
ksiomelo Avatar asked Jun 21 '13 21:06

ksiomelo


People also ask

What are the different ways to solve producer-consumer problem in Java?

We can solve the producer-consumer problem using semaphores, Semaphore is a thread synchronization construct. Semaphore is used to control the access to a shared resource with the help of a variable/counter. The other way to solve this is using a thread. A thread is a sequential flow of control in a program.

How can you solve consumer producer problem by using wait () and notify () method?

Producer must ensure that no element should be added when buffer is full, it should call wait() until consumer consume some data and notify to the producer thread AND consumer must ensure that it should not try to remove item from buffer when it is already empty, it should call wait() which simply waits until producer ...

What is producer-consumer problem in threads?

The producer and consumer problem is one of the small collection of standard, well-known problems in concurrent programming. A finite-size buffer and two classes of threads, producers and consumers, put items into the buffer (producers) and take items out of the buffer (consumers).

What is Producer consumer in Java?

Producer and Consumer are two separate processes. Both processes share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.


1 Answers

This is at least one problem, in your put method:

boolean full = (start == (end+storage.length+1)%storage.length);           
while(full){ // queue is full
    // Code that doesn't change full
}

If full is ever initialized as true, how do you expect the loop to end?

The other problem is this loop, in the consumer:

while(!buffer.isEmpty()) {
    int i = buffer.take();
    System.out.println("Consumer reads "+i);
}

You're assuming the producer never lets the buffer get empty - if the consumer starts before the producer, it will stop immediately.

Instead, you want some way of telling the buffer that you've stopped producing. The consumer should keep taking until the queue is empty and won't receive any more data.

like image 198
Jon Skeet Avatar answered Sep 30 '22 11:09

Jon Skeet