Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java networking: evented Socket/InputStream

I'm implementing an event-oriented layer over Java's Sockets, and I was wondering if there was a way to determine if there is data pending to read.

My normal approach would be to read from the socket into a buffer, and call the provided callbacks when the buffer is filled over a given amount of bytes (which could be 0, if the callback needs to be fired every time anything arrives), but I suspect Java is already doing the buffering for me.

Is the available() method of InputStream reliable for this? Should I just read() and do my own buffering on top of the Socket? Or is there another way?

like image 530
slezica Avatar asked May 11 '11 19:05

slezica


People also ask

What is the proper initialization of socket InputStream?

Firstly, we need to declare and initialize ServerSocket, Socket, and DataInputStream variables: ServerSocket server = new ServerSocket(port); Socket socket = server. accept(); DataInputStream in = new DataInputStream(new BufferedInputStream(socket.

What is socket in Java networking?

A socket is one endpoint of a two-way communication link between two programs running on the network. A socket is bound to a port number so that the TCP layer can identify the application that data is destined to be sent to. An endpoint is a combination of an IP address and a port number.

How do you create a secure client socket connection in Java?

Let's provide an example of how we can create a secured connection to a server: String host = getHost(...); Integer port = getPort(...); SSLSocketFactory sslsocketfactory = SSLSocketFactory. getDefault(); SSLSocket sslsocket = (SSLSocket) sslsocketfactory . createSocket(host, port); InputStream in = sslsocket.


1 Answers

Shortly put, no. available() is not reliable (at least it was not for me). I recommend using java.nio.channels.SocketChannel connected with Selector and SelectionKey. This solution is somewhat event-based, but is more complicated than just plain sockets.

For clients:

  1. Construct socket channel (socket), open a selector (selector = Selector.open();).
  2. Use non-blocking socket.configureBlocking(false);
  3. Register selector for connections socket.register(selector, SelectionKey.OP_CONNECT);
  4. Connect socket.connect(new InetSocketAddress(host, port));
  5. See if there is anything new selector.select();
  6. If the "new" refers to successful connection, register the selector for OP_READ; if the "new" refers to data available, just read from the socket.

However, in order to have it asynchronous you would need to set up a separate thread (despite the socket being created as non-blocked, the thread will block anyway) that checks whether something has arrived or not.

For servers, there is ServerSocketChannel and you use OP_ACCEPT for it.

For reference, this is my code (client), should give you a hint:

 private Thread readingThread = new ListeningThread();

 /**
  * Listening thread - reads messages in a separate thread so the application does not get blocked.
  */
 private class ListeningThread extends Thread {
  public void run() {
   running = true;
   try {
    while(!close) listen();
    messenger.close();
   }
   catch(ConnectException ce) {
    doNotifyConnectionFailed(ce);
   }
   catch(Exception e) {
//    e.printStackTrace();
    messenger.close();
   }
   running = false;
  }
 }

 /**
  * Connects to host and port.
  * @param host Host to connect to.
  * @param port Port of the host machine to connect to.
  */
 public void connect(String host, int port) {
  try {
   SocketChannel socket = SocketChannel.open();
   socket.configureBlocking(false);
   socket.register(this.selector, SelectionKey.OP_CONNECT);
   socket.connect(new InetSocketAddress(host, port));
  }
  catch(IOException e) {
   this.doNotifyConnectionFailed(e);
  }
 }

 /**
  * Waits for an event to happen, processes it and then returns.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   iter.remove();
   // check validity
   if(key.isValid()) {
    // if connectable...
    if(key.isConnectable()) {
     // ...establish connection, make messenger, and notify everyone
     SocketChannel client = (SocketChannel)key.channel();
     // now this is tricky, registering for OP_READ earlier causes the selector not to wait for incoming bytes, which results in 100% cpu usage very, very fast
     if(client!=null && client.finishConnect()) {
      client.register(this.selector, SelectionKey.OP_READ);
     }
    }
    // if readable, tell messenger to read bytes
    else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) {
     // read message here
    }
   }
  }
 }

 /**
  * Starts the client.
  */
 public void start() {
  // start a reading thread
  if(!this.running) {
   this.readingThread = new ListeningThread();
   this.readingThread.start();
  }
 }

 /**
  * Tells the client to close at nearest possible moment.
  */
 public void close() {
  this.close = true;
 }

And for server:

 /**
  * Constructs a server.
  * @param port Port to listen to.
  * @param protocol Protocol of messages.
  * @throws IOException when something goes wrong.
  */
 public ChannelMessageServer(int port) throws IOException {
  this.server = ServerSocketChannel.open();
  this.server.configureBlocking(false);
  this.server.socket().bind(new InetSocketAddress(port));
  this.server.register(this.selector, SelectionKey.OP_ACCEPT);
 }

 /**
  * Waits for event, then exits.
  * @throws IOException when something goes wrong.
  */
 protected void listen() throws IOException {
  // see if there are any new things going on
  this.selector.select();
  // process events
  Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  while(iter.hasNext()) {
   SelectionKey key = iter.next();
   // do something with the connected socket
   iter.remove();
   if(key.isValid()) this.process(key);
  }
 }

 /**
  * Processes a selection key.
  * @param key SelectionKey.
  * @throws IOException when something is wrong.
  */
 protected void process(SelectionKey key) throws IOException {
  // if incoming connection
  if(key.isAcceptable()) {
   // get client
   SocketChannel client = (((ServerSocketChannel)key.channel()).accept());
    try {
     client.configureBlocking(false);
     client.register(this.selector, SelectionKey.OP_READ);
    }
    catch(Exception e) {
     // catch
    }
  }
  // if readable, tell messenger to read
  else if(key.isReadable()) {
  // read
  }
 }

Hope this helps.

like image 67
Miki Avatar answered Oct 06 '22 00:10

Miki