From my main I am starting two threads called producer and consumer. Both contains while(true)
loop. Producer loop is UDP Server hence it does not require sleep. My problem is in the Consumer loop. Consumer loop remove the objects from the linked queue and pass it on to a function for further processing. From what researched it is not a good practice to use thread sleep in a loop as at times O/S will not release at end of set time. If I remove thread sleep when the application is ideal it drags CPU to 20 to 30%.
class Producer implements Runnable {
private DatagramSocket dsocket;
FError fer = new FError();
int port =1548;
ConcurrentLinkedQueue<String> queue;
Producer(ConcurrentLinkedQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
// Create a socket to listen on the port.
dsocket = new DatagramSocket(port);
// Create a buffer to read datagrams into.
byte[] buffer = new byte[30000];
// Create a packet to receive data into the buffer
DatagramPacket packet = new DatagramPacket(buffer,
buffer.length);
while (true) {
try {
// Wait to receive a datagram
dsocket.receive(packet);
//Convert the contents to a string,
String msg = new String(buffer, 0, packet.getLength());
int ltr = msg.length();
// System.out.println("MSG =" + msg);
if(ltr>4)
{
SimpleDateFormat sdfDate = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy
Date now = new Date();
String strDate = sdfDate.format(now);
//System.out.println(strDate);
queue.add(msg + "&" + strDate);
// System.out.println("MSG =" + msg);
}
// Reset the length of the packet before reusing it.
packet.setLength(buffer.length);
} catch (IOException e) {
fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
dsocket.close();
break;
}
}
} catch (SocketException e) {
fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage());
}
}
}
class Consumer implements Runnable {
String str;
ConcurrentLinkedQueue<String> queue;
Consumer(ConcurrentLinkedQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
while ((str = queue.poll()) != null) {
call(str); // do further processing
}
} catch (IOException e) {
ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
}
}
}
}
interrupt() method: If any thread is in sleeping or waiting for a state then using the interrupt() method, we can interrupt the execution of that thread by showing InterruptedException. A thread that is in the sleeping or waiting state can be interrupted with the help of the interrupt() method of Thread class.
When using Thread. sleep(), we have to mention wait time in advance, there is no guarantee that the element will be displayed in that specific wait time, there may be case when it will takes may be more than 5 seconds to load and again the script would fail.
Java static code analysis: "wait(...)" should be used instead of "Thread. sleep(...)" when a lock is held.
Thread. sleep is bad! It blocks the current thread and renders it unusable for further work.
Instead of making Consumer extend Runnable
you could change your code to incorporate a ScheduledExecutorService
which runs the polling of the queue every half a second instead of making the thread sleep. An example of this would be
public void schedule() {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> {
String str;
try {
while ((str = queue.poll()) != null) {
call(str); // do further processing
}
} catch (IOException e) {
ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
The proper solution to your problem is to use a blocking queue. It gives you several advantages:
Here is a small demo, which you can play with:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProdConsTest {
public static void main(String[] args) throws InterruptedException {
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
final Runnable producer = () -> {
for (int i = 0; i < 1000; i++) {
try {
System.out.println("Producing: " + i);
queue.put(i);
//Adjust production speed by modifying the sleep time
Thread.sleep(100);
} catch (InterruptedException e) {
//someone signaled us to terminate
break;
}
}
};
final Runnable consumer = () -> {
while (true) {
final Integer integer;
try {
//Uncomment to simulate slow consumer:
//Thread.sleep(1000);
integer = queue.take();
} catch (InterruptedException e) {
//someone signaled us to terminate
break;
}
System.out.println("Consumed: " + integer);
}
};
final Thread consumerThread = new Thread(consumer);
consumerThread.start();
final Thread producerThread = new Thread(producer);
producerThread.start();
producerThread.join();
consumerThread.interrupt();
consumerThread.join();
}
}
Now uncomment the sleep()
in the consumer and observe what happems with the application. If you were using a timer based solution such as the proposed ScheduledExecutorService
or you were busy waiting, then with fast producer, the queue would grow uncontrollably and eventually crash your application
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With