Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java IPC w/Sockets - Using Loopback Device

Tags:

java

sockets

ipc

So I'm trying to implement socket communication between processes for a project. But I can't seem to connect to any port via the loopback device. Am I missing something here? I've let this run an attempt on almost 500 ports and it always refuses to connect.

static final String HOST = "127.0.0.1";

public static void main(String[] args) throws IOException {
    int port = 1000;
    while (true) {
        try {
            socket = new Socket(HOST, PORT); // initialing the socket
            writer = new OutputStreamWriter(socket.getOutputStream());
            reader = new InputStreamReader(socket.getInputStream());
            break;
        } catch (ConnectException ex) {
            System.out.println("failure on port: " + port);
            ++port; // increment port to try next 
        }
    }

    ...

};

Here is the entire program if anyone wants to see the declarations and whatnot.

package socket_ipc;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;

public class Socket_IPC {
    static final int NUMBER_OF_MESSAGES = 100; // number of messages to pass
    static int[] PRODUCED_MSSG = new int[NUMBER_OF_MESSAGES]; // for comparing
    static int[] CONSUMED_MSSG = new int[NUMBER_OF_MESSAGES]; // for comparing

    static final String HOST = "127.0.0.1";    // IP address of loopback device
    static final int PORT = 1000;              // arbitrary port number (local)

    static OutputStreamWriter writer;   // write to socket
    static InputStreamReader reader;    // read from socket
    static Socket socket;               // the socket

    private static class s_Producer extends Thread {
        @Override
        public void run() {
            for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                try {
                    PRODUCED_MSSG[i] = (int)(Math.random() * 256); // get data
                    writer.write(PRODUCED_MSSG[i]); // write data to the socket
                } catch (IOException ex) {
                    System.err.println(ex.toString());
                }
            }
        }
    }

    private static class s_Consumer extends Thread {
        @Override
        public void run() {
            for(int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                try {
                    int data = reader.read();   // obtain data from the socket
                    CONSUMED_MSSG[i] = data;    // put retrieved data in array
                } catch (IOException ex) {
                    System.err.println(ex.toString());
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        int port = PORT; // beginning at 1000
        while (true) {
            try {
                socket = new Socket(HOST, port); // initialing the socket
                writer = new OutputStreamWriter(socket.getOutputStream());
                reader = new InputStreamReader(socket.getInputStream());
                break;
            } catch (ConnectException ex) {
                System.out.println("failure on port: " + port);
                ++port; // increment port to try next 
            }
        }

        /* insanciating and starting the producer process */
        s_Producer p = new s_Producer();
        p.start();

        /* insanciating and starting the consumer process */
        s_Consumer c = new s_Consumer();
        c.start();

        try { /* joining threads to wait for completion */
            p.join();
            c.join();
        } catch (InterruptedException ex) {
            System.err.println(ex.toString());
        }

        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            System.out.println(
                "[" + i + "]: " + PRODUCED_MSSG[i] + " == " + CONSUMED_MSSG[i]);
            if (PRODUCED_MSSG[i] != CONSUMED_MSSG[i]) {
                System.out.println("PROCESS SYNCHRONIZATION ERROR!");
            System.exit(0);
            }
        }

        System.out.println("PROCESS SYNCHRONIZATION SUCCESS!");

    }

};

1 Answers

Well I'm not sure what exactly was the cause of the issue, it very well could be the use of a port number < 1024. Also, I eventually came across a great set of encapsulating readers/writers, the DataOutputStream and DataInputStream classes. These provided a much easier solution by providing specific read and write ops based on the object; such as dis.readInt() and dos.writeInt(int n).

Here is the completed and successfully running source code:

package socket_ipc;
import java.net.*;
import java.io.*;
/** Socket Inter-Process Communication.
 * Implements the Producer-Consumer Problem, using Sockets for IPC.
 * @author William Hatfield: CEG-4350-01 Fall 2015
 */
public class Socket_IPC {
    static final int NUMBER_OF_MESSAGES = 100; // number of messages to pass
    static int[] PRODUCED_MSSG = new int[NUMBER_OF_MESSAGES]; // for comparing
    static int[] CONSUMED_MSSG = new int[NUMBER_OF_MESSAGES]; // for comparing
    static final String HOST = "127.0.0.1"; // IP address of loopback device
    static final int PORT = 1234;           // arbitrary port number (local)
    static ServerSocket serverSocket;       // the shared server socket
    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                Socket toClient = serverSocket.accept();
                DataOutputStream dos = new DataOutputStream(toClient.getOutputStream());
                for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                PRODUCED_MSSG[i] = 
                        (int)((Math.random() - .5) * Integer.MAX_VALUE);
                dos.writeInt(PRODUCED_MSSG[i]);
            }
            } catch (IOException ex) {
                System.err.println("Producer Error: " + ex.toString());
            }
        }
    }
    private static class Consumer extends Thread {
        @Override
        public void run() {
            try {
                Socket fromServer = new Socket(HOST, PORT);
                DataInputStream dis = new DataInputStream(fromServer.getInputStream());
                for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                CONSUMED_MSSG[i] = dis.readInt();
            }
            } catch (IOException ex) {
                System.err.println("Consumer Error: " + ex.toString());
            }    
        }
    }
    public static void main(String[] args) {
        try {
            serverSocket = new ServerSocket(PORT);
            Producer p = new Producer();    // create the producer thread
            Consumer c = new Consumer();    // create the consumer thread
            p.start();                      // start the producer thread
            c.start();                      // start the consumer thread
            p.join();                       // wait for producer thread
            c.join();                       // wait for consumer thread
        } catch (InterruptedException | IOException ex) { /* handle later */ }
         /* compare produced and consumed data, exit if any match fails */
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
            System.out.print("[" + i + "]: " + PRODUCED_MSSG[i]);
            System.out.println(" == " + CONSUMED_MSSG[i]);
            if (PRODUCED_MSSG[i] != CONSUMED_MSSG[i]) {
                System.out.println("PROCESS SYNC ERROR AT INDEX: " + i);
                System.exit(0);
            }
        }
        /* inform the user that synchroniozation was succesful, then exit */
        System.out.println("SYNC SUCCESFUL!");
    }
}

If there are any improvements you would like to offer, I'm happy to listen. Especially to the die-hard Java coders, I'm sure there's something that goes against the standard Java practice in here!


Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!