Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronizing queue in Java on multiple threads

I understand the concept of synchronizing, but I'm now sure about why is it implemented that way, so I need a little help in here:

I have 2 threads:

PeriodicalThread will receive data periodically (let's say each 5 seconds) and put it in a Queue (using ArrayDeque for the moment, but I don't know if any other Queue implementation will work better)

ProccessThread will constantly check on the Queue to see if it is empty. If it is not empty, it will process the data (FIFO).

So, at first my implementation would be:

// Both threads are inner class so they have access to Queue

private Queue queue;
private boolean isReadyToProccess;


class PeriodicalThread extends Thread {
    public void run() {
        while(true) {
           if(isNewDataAvailable) {
                // create Data object
                queue.add(data);
           }
        }
    }
}

class ProcessThread extends Thread {
    public void run() {
        while(true) {
           if(!queue.isEmpty() && isReadyToProccess) {
               Data data = queue.poll();
               processData(data);
           }
        }
    }
}

private void processData(Data data) {
    // this method send data over network, and the server response callback
    // changes isReadyToProcess value to true.
}

Then when wanting to handle synchronization, I wouldn't know If I should use a lock object (and how is it implemented) or if there is already a package Queue implementation that is thread-safe (because of the add() and poll() methods)

Edit: I forgot about the flag isReadyToProcess indicating next queue Data object is... well, ready to be proccessed. This flag should be synchronized also.

like image 480
Christopher Francisco Avatar asked Jun 06 '14 15:06

Christopher Francisco


2 Answers

ArrayDeque doesn't support concurrency. Instead, use a real queue that supports concurrent work like BlockingQueue and one of its implementations in java.util.concurrent package. I recommend using LinkedBlockingQueue.

In case you need to share flags between your threads, it would be better using AtomicBoolean instead of manually synchronizing a primitive boolean field.

Note: if you will work with concurrent process, it is better to work with classes provided by java.util.concurrent package that already support lock and synchronization out-of-the-box.

like image 198
Luiggi Mendoza Avatar answered Nov 04 '22 09:11

Luiggi Mendoza


You are looking for Blocking Queue implementation

This provides the functionality out of box which you are looking for. Thats why its best suited for producer consumer examples.

Here is an example

public class BlockingQueueExample {

    public static void main(String[] args) throws Exception {

        BlockingQueue queue = new ArrayBlockingQueue(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}


public class Producer implements Runnable{

    protected BlockingQueue queue = null;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


public class Consumer implements Runnable{

    protected BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
like image 26
M Sach Avatar answered Nov 04 '22 10:11

M Sach