Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue full error, working with multiple consumers, producers

I would like to simulate the following scenario: Multiple consumers, producer threads working on modifying some data as

Setup

    BlockingQueue<String> q1 = new SynchronousQueue<String>();
    BlockingQueue<String> q2 = new SynchronousQueue<String>();

    Producer dataProducer = new Producer(q1); // publish to q1

    Filter1 filter1 = new Filter1(q1, q2);    // read from q1, publish to q2
    Filter2 filter2 = new Filter2(q2);        // read from q2

    new Thread(dataProducer, "Producer-Thread").start();
    new Thread(filter1, "Filter1-Thread").start();
    new Thread(filter2, "Filter2-Thread").start();

Producer

public void run() {
    try {
        while (true) {
            this.q.put(saySomething());
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

public String saySomething() {
    return "Something";
}

Filter 1

public void run() {
    try {
        while (true) {
            consume(qIn.take());
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

private void consume(String take) {
    //Modify data according to some rules
    String newData = take.replace("m", "-");
    produce(newData);
}

private void produce(String newData) {
    // put new data in queue out
    qOut.add(newData);                   // <-- Stacktrace points here
}

Filter 2

public void run() {
    try {
        while (true) {
            consume(qIn.take());
        }
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

private void consume(String s) {
    System.out.println("Something became: " + s);
}

So, to recap: Producer puts something in a queue from which Filter 1 reads. It modifies the data and publishes it to another queue from which Filter 2 reads. Filter 2 prints the final data.

This code fails with

Exception in thread "Thread-2" java.lang.IllegalStateException: Queue full

Could you please help me understand why?

like image 981
James Raitsev Avatar asked Feb 01 '12 03:02

James Raitsev


Video Answer


1 Answers

You should be using put() and not add() A SynchronousQueue is always full and empty, it has no depth. put() will tell the queue to suspend this thread until another thread comes in to take the element off the queue.

The add() method will only succeed if there is a thread waiting, if no thread is waiting you will get the Queue Full exception.

like image 70
John Vint Avatar answered Nov 15 '22 00:11

John Vint