I understand the concept of synchronizing, but I'm now sure about why is it implemented that way, so I need a little help in here:
I have 2 threads:
PeriodicalThread
will receive data periodically (let's say each 5 seconds) and put it in a Queue
(using ArrayDeque
for the moment, but I don't know if any other Queue
implementation will work better)
ProccessThread
will constantly check on the Queue
to see if it is empty. If it is not empty, it will process the data (FIFO).
So, at first my implementation would be:
// Both threads are inner class so they have access to Queue
private Queue queue;
private boolean isReadyToProccess;
class PeriodicalThread extends Thread {
public void run() {
while(true) {
if(isNewDataAvailable) {
// create Data object
queue.add(data);
}
}
}
}
class ProcessThread extends Thread {
public void run() {
while(true) {
if(!queue.isEmpty() && isReadyToProccess) {
Data data = queue.poll();
processData(data);
}
}
}
}
private void processData(Data data) {
// this method send data over network, and the server response callback
// changes isReadyToProcess value to true.
}
Then when wanting to handle synchronization, I wouldn't know If I should use a lock
object (and how is it implemented) or if there is already a package Queue
implementation that is thread-safe (because of the add()
and poll()
methods)
Edit: I forgot about the flag isReadyToProcess
indicating next queue Data
object is... well, ready to be proccessed. This flag should be synchronized also.
ArrayDeque
doesn't support concurrency. Instead, use a real queue that supports concurrent work like BlockingQueue
and one of its implementations in java.util.concurrent
package. I recommend using LinkedBlockingQueue
.
In case you need to share flags between your threads, it would be better using AtomicBoolean
instead of manually synchronizing a primitive boolean
field.
Note: if you will work with concurrent process, it is better to work with classes provided by java.util.concurrent
package that already support lock and synchronization out-of-the-box.
You are looking for Blocking Queue implementation
This provides the functionality out of box which you are looking for. Thats why its best suited for producer consumer examples.
Here is an example
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
public class Producer implements Runnable{
protected BlockingQueue queue = null;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Consumer implements Runnable{
protected BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With