Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process items in queue as the items are added inside a while loop

Tags:

java

I have a method where I listen for UDP packets in a while loop. I want to parse the packets using another method in a different class as they arrive and do many different parsing and analyzing of each packet in another part of the application. I am thinking it would be better to have the PacketParser methods process the Queue outside of the loop. Would it be possible to just add the packets to a Queue as they come in and then have another part of the application listen for items as they come into the Queue and perform other actions as the original while loop keeps listening for packets and adds them to the queue? I would like to have another function monitor the queue and process the packets, is there something in Java to monitor a Queue or Stack? Is there a better way to do this?

public void read(String multicastIpAddress, int multicastPortNumber) {
        PacketParser parser = new PacketParser(logger);
        InetAddress multicastAddress = null;
        MulticastSocket multicastSocket = null;
        final int PortNumber = multicastPortNumber;
        try {
            multicastAddress = InetAddress.getByName(multicastIpAddress);
            multicastSocket = new MulticastSocket(PortNumber);
            String hostname = InetAddress.getLocalHost().getHostName();
            byte[] buffer = new byte[8192];
            multicastSocket.joinGroup(multicastAddress);
            System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
            int numberOfPackets = 0;
            while (true) {
                numberOfPackets++;
                DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
                multicastSocket.receive(datagramPacket);
                // add to queue for another function to process the packets

            }
        } catch (SocketException socketException) {
            System.out.println("Socket exception " + socketException);
        } catch  (IOException exception) {
            System.out.println("Exception " + exception);
        } finally {
            if (multicastSocket != null) {
                try {
                    multicastSocket.leaveGroup(multicastAddress);
                    multicastSocket.close();
                } catch (IOException exception) {
                    System.out.println(exception.toString());
                }
            }
        }
    }
like image 816
Jonathan Kittell Avatar asked Jan 14 '16 16:01

Jonathan Kittell


1 Answers

Ok, so I did some reading about the producer-consumer pattern and figured it out so here is what I did.

Basically the producer-consumer pattern involves three things: a producer, a consumer and a shared queue. In this context the PacketReader is the producer that takes in network packets and places them into the shared queue. The PacketParser is the consumer who processes the packets in the shared queue. So I created an instance of a LinkedBlockingQueue and passed that shared queue into an instance of the consumer (PacketReader) and an instance of the producer (PacketParser). Then the consumer and producer instances are each passed into an instance of the Thread class. Finally, call the start() method on each thread instance.

public class Main {

    public static void main(String[] args) {
        BlockingQueue<Packet> queue = new LinkedBlockingQueue<>();
        ILogger logger = Injector.getLogger();

        Thread reader = new Thread(new PacketReader(logger, queue, "239.1.1.1", 49410));
        Thread parser = new Thread(new PacketParser(logger, queue));

        reader.start();
        parser.start();
    }
}

The reason to use the LinkedBlockingQueue is because the put() method will block the queue if full and take() will block if queue if empty. The producer and consumer classes need to implement the Runnable interface and contain a method named run() that takes no parameters.

Consumer class

public class PacketParser implements Runnable {

    private ILogger logger;
    private BlockingQueue<Packet> queue;
    private boolean running = true;

    public PacketParser(ILogger logger, BlockingQueue<Packet> queue) {
        this.logger = logger;
        this.queue = queue;
    }

    public void stop() {
        running = false;
    }

    public void run() {
        while (running) {
            Packet packet;
            try {
                packet = queue.take();
                parse(packet);
            } catch (InterruptedException exception) {
                logger.Log(exception.getStackTrace().toString());
            }
        }
    }

Producer class

public class PacketReader implements Runnable {

    private ILogger logger;
    private final Queue<Packet> queue;
    private String multicastIpAddress;
    private int multicastPortNumber;
    private boolean running = true;

    public PacketReader(ILogger logger, Queue<Packet> queue, String multicastIpAddress, int multicastPortNumber) {
        this.logger = logger;
        this.queue = queue;
        this.multicastIpAddress = multicastIpAddress;
        this.multicastPortNumber = multicastPortNumber;
    }

    public void stop() {
        running = false;
    }

    public void run() {
        InetAddress multicastAddress = null;
        MulticastSocket multicastSocket = null;
        try {
            multicastAddress = InetAddress.getByName(multicastIpAddress);
            multicastSocket = new MulticastSocket(multicastPortNumber);
            String hostname = InetAddress.getLocalHost().getHostName();
            byte[] buffer = new byte[8192];
            multicastSocket.joinGroup(multicastAddress);
            System.out.println("Listening from " + hostname + " at " + multicastAddress.getHostName());
            int numberOfPackets = 0;

            while (running) {
                numberOfPackets++;
                DatagramPacket datagramPacket = new DatagramPacket(buffer, buffer.length);
                multicastSocket.receive(datagramPacket);
                Packet packet = new Packet(numberOfPackets, datagramPacket);
                queue.add(packet);
            }
        } catch (SocketException socketException) {
            System.out.println("Socket exception " + socketException);
        } catch  (IOException exception) {
            System.out.println("Exception " + exception);
        } finally {
            if (multicastSocket != null) {
                try {
                    multicastSocket.leaveGroup(multicastAddress);
                    multicastSocket.close();
                } catch (IOException exception) {
                    System.out.println(exception.toString());
                }
            }
        }
    }
}
like image 75
Jonathan Kittell Avatar answered Nov 09 '22 02:11

Jonathan Kittell