I have a method where I listen for UDP packets in a while loop. I want to parse the packets using another method in a different class as they arrive and do many different parsing and analyzing of each packet in another part of the application. I am thinking it would be better to have the PacketParser
methods process the Queue outside of the loop. Would it be possible to just add the packets to a Queue
as they come in and then have another part of the application listen for items as they come into the Queue and perform other actions as the original while loop keeps listening for packets and adds them to the queue? I would like to have another function monitor the queue and process the packets, is there something in Java
to monitor a Queue
or Stack
? Is there a better way to do this?
public void read(String multicastIpAddress, int multicastPortNumber) {
PacketParser parser = new PacketParser(logger);
InetAddress multicastAddress = null;
MulticastSocket multicastSocket = null;
final int PortNumber = multicastPortNumber;
try {
multicastAddress = InetAddress.getByName(multicastIpAddress);
multicastSocket = new MulticastSocket(PortNumber);
String hostname = InetAddress.getLocalHost().getHostName();
byte[] buffer = new byte[8192];
multicastSocket.joinGroup(multicastAddress);
System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
int numberOfPackets = 0;
while (true) {
numberOfPackets++;
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
multicastSocket.receive(datagramPacket);
// add to queue for another function to process the packets
}
} catch (SocketException socketException) {
System.out.println("Socket exception " + socketException);
} catch (IOException exception) {
System.out.println("Exception " + exception);
} finally {
if (multicastSocket != null) {
try {
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (IOException exception) {
System.out.println(exception.toString());
}
}
}
}
Ok, so I did some reading about the producer-consumer pattern and figured it out so here is what I did.
Basically the producer-consumer pattern involves three things: a producer, a consumer and a shared queue. In this context the PacketReader
is the producer that takes in network packets and places them into the shared queue. The PacketParser
is the consumer who processes the packets in the shared queue. So I created an instance of a LinkedBlockingQueue
and passed that shared queue into an instance of the consumer (PacketReader) and an instance of the producer (PacketParser). Then the consumer and producer instances are each passed into an instance of the Thread
class. Finally, call the start()
method on each thread instance.
public class Main {
public static void main(String[] args) {
BlockingQueue<Packet> queue = new LinkedBlockingQueue<>();
ILogger logger = Injector.getLogger();
Thread reader = new Thread(new PacketReader(logger, queue, "239.1.1.1", 49410));
Thread parser = new Thread(new PacketParser(logger, queue));
reader.start();
parser.start();
}
}
The reason to use the LinkedBlockingQueue
is because the put()
method will block the queue if full and take()
will block if queue if empty. The producer and consumer classes need to implement the Runnable
interface and contain a method named run()
that takes no parameters.
Consumer class
public class PacketParser implements Runnable {
private ILogger logger;
private BlockingQueue<Packet> queue;
private boolean running = true;
public PacketParser(ILogger logger, BlockingQueue<Packet> queue) {
this.logger = logger;
this.queue = queue;
}
public void stop() {
running = false;
}
public void run() {
while (running) {
Packet packet;
try {
packet = queue.take();
parse(packet);
} catch (InterruptedException exception) {
logger.Log(exception.getStackTrace().toString());
}
}
}
Producer class
public class PacketReader implements Runnable {
private ILogger logger;
private final Queue<Packet> queue;
private String multicastIpAddress;
private int multicastPortNumber;
private boolean running = true;
public PacketReader(ILogger logger, Queue<Packet> queue, String multicastIpAddress, int multicastPortNumber) {
this.logger = logger;
this.queue = queue;
this.multicastIpAddress = multicastIpAddress;
this.multicastPortNumber = multicastPortNumber;
}
public void stop() {
running = false;
}
public void run() {
InetAddress multicastAddress = null;
MulticastSocket multicastSocket = null;
try {
multicastAddress = InetAddress.getByName(multicastIpAddress);
multicastSocket = new MulticastSocket(multicastPortNumber);
String hostname = InetAddress.getLocalHost().getHostName();
byte[] buffer = new byte[8192];
multicastSocket.joinGroup(multicastAddress);
System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
int numberOfPackets = 0;
while (running) {
numberOfPackets++;
DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
multicastSocket.receive(datagramPacket);
Packet packet = new Packet(numberOfPackets, datagramPacket);
queue.add(packet);
}
} catch (SocketException socketException) {
System.out.println("Socket exception " + socketException);
} catch (IOException exception) {
System.out.println("Exception " + exception);
} finally {
if (multicastSocket != null) {
try {
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (IOException exception) {
System.out.println(exception.toString());
}
}
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With