I had developed an java server using java nio
sockets. It is the code of my application:
public class EchoServer {
static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class);
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 4664;
private InetAddress hostAddress = null;
private int port;
private String ipAddress = "my ip";
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
int timestamp = 1;
HashMap<Integer, String> connectedClients = new HashMap<Integer, String>();
HashMap<String, Integer> clientIds= new HashMap<String,Integer>();
HashMap<String, String> messageToClients = new HashMap<String, String>();
public EchoServer() {
this(DEFAULT_PORT);
}
public EchoServer(int port) {
try{
this.port = port;
hostAddress = InetAddress.getByName(ipAddress);
selector = initSelector();
loop();
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private Selector initSelector() {
try{
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
return null;
}
}
private void loop() {
while (true) {
try {
// Do defined operations for clients
// ------------------------------
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys()
.iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
logger.warn(key.hashCode() + "- is invalid");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
// Fetch List from server
// -----------------------------------------
try {
ResultSet resultset = DataBase.getInstance()
.getQueryResult();
boolean flag = false;
while (resultset.next()) {
String mobileNumber = resultset.getString("MobileNo");
String message = resultset.getInt("IsMessage") + ","
+ resultset.getInt("IsDeliver") + ","
+ resultset.getInt("IsGroup") + ","
+ resultset.getInt("IsSeen");
messageToClients.put(mobileNumber, message);
}
} catch (Exception ex) {
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
// Wait for 1 second
// -----------------------------------------------
Thread.sleep(1000);
timestamp++;
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) {
try{
// Initialize the connection ------------------------------------------
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key
.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
logger.info("New client accepted");
// Fire read for reading phone number --------------------------------
socketChannel.register(selector, SelectionKey.OP_READ);
}catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
private void read(SelectionKey key) {
try{
// Initialize Socket -----------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Reading Client Number -------------------------------------------------
readBuffer.clear();
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
logger.error("Forceful shutdown");
key.cancel();
return;
}
// read was not successful
if (numRead == -1) {
logger.error("Graceful shutdown");
key.cancel();
return;
}
// read was successful and now we can write it to String
readBuffer.flip();
byte[] bytes = new byte[readBuffer.limit()];
readBuffer.get(bytes);
String number = new String(bytes);
number = number.replace("\r\n", "");
number = number.trim();
// Update Connect Clients Status -----------------------------------------
Integer clientId=clientIds.get(number);
if ( clientId == null) {
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") has Connected");
}else{
connectedClients.remove(clientId);
connectedClients.put(key.hashCode(), number);
clientIds.put(number, key.hashCode());
logger.error(number + "- (" + key.hashCode() + ") REconnected");
}
//System.err.println("All clients number are:" + connectedClients.size());
logger.error("All clients number are:" + connectedClients.size());
// Fire Write Operations -------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
}catch(Exception ex){
//ex.printStackTrace();
logger.error("Exception Accoured:",ex);
}
}
private void write(SelectionKey key) {
try {
//Check channel still alive ----------------------------------------------
String clientNumber = connectedClients.get(key.hashCode());
if(clientNumber == null){
key.cancel();
return;
}
// Get Channel -----------------------------------------------------------
SocketChannel socketChannel = (SocketChannel) key.channel();
// Send Message if client number have new message ------------------------
if (messageToClients.get(clientNumber) != null) {
logger.info(clientNumber + "-" + key.hashCode()
+ "- Sent write message");
String timeStamp = String.valueOf(timestamp);
String message = messageToClients.get(clientNumber);
ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8"));
socketChannel.write(dummyResponse);
messageToClients.remove(clientNumber);
}
// Fire new write state --------------------------------------------------
socketChannel.register(selector, SelectionKey.OP_WRITE);
} catch (IOException iox) {
logger.error("Exception Accoured:",iox);
String number = connectedClients.get(key.hashCode());
clientIds.remove(number);
connectedClients.remove(key.hashCode());
key.cancel();
} catch(Exception ex){
logger.error("Exception Accoured:",ex);
}
}
}
When I am testing with 2-3 clients it is working fine but when I start testing it with about 100-300 client I recived below exception at several times(Actually it is happeingin on write()
method and line socketChannel.write(dummyResponse);
:
java.io.IOException: An established connection was aborted by the software in your host machine
at sun.nio.ch.SocketDispatcher.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(Unknown Source)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(Unknown Source)
at sun.nio.ch.IOUtil.write(Unknown Source)
at sun.nio.ch.SocketChannelImpl.write(Unknown Source)
at net.behboodi.testserver.EchoServer.write(EchoServer.java:274)
at net.behboodi.testserver.EchoServer.loop(EchoServer.java:106)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:56)
at net.behboodi.testserver.EchoServer.<init>(EchoServer.java:47)
at net.behboodi.testserver.Main.main(Main.java:44)
and then I can not receive messages from the server.
When you get end of stream (read()
returns -1) you aren't closing the channel, so you're leaking channels. You need to close the channel, and note that doing so will automatically cancel the key. Just cancelling the key by itself is not sufficient. Same when you cancel the key because clientNumber == null
or you get an IOException
.
NB you shouldn't register OP_WRITE
unless you have just received zero from a write()
call, and you should deregister it whenever you don't get that zero.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With