I have created a simple server client application using java NIO. I used a single selector there for accepting connection, reading data and writing. But I want an application where 1 selector will be busy in accepting the connection while the 2nd selector will read the data and the 3rd selector will write the data.
Means I donot want to put all the load into single selector.
How to achieve this? Is there any online help
Thanks Deepak.
// Create the selector Selector selector = Selector.open();
// Create two non-blocking server sockets on 80 and 81
ServerSocketChannel ssChannel1 = ServerSocketChannel.open();
ssChannel1.configureBlocking(false);
ssChannel1.socket().bind(new InetSocketAddress(80));
// Register both channels with selector
ssChannel1.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// Wait for an event
selector.select();
// Get list of selection keys with pending events
Iterator it = selector.selectedKeys().iterator();
// Process each key
while (it.hasNext()) {
// Get the selection key
SelectionKey selKey = (SelectionKey)it.next();
// Remove it from the list to indicate that it is being processed
it.remove();
// Check if it's a connection request
if (selKey.isAcceptable()) {
// Get channel with connection request
ServerSocketChannel ssChannel = (ServerSocketChannel)selKey.channel();
// Accepting a Connection on a ServerSocketChannel
SocketChannel sChannel = serverSocketChannel.accept();
// If serverSocketChannel is non-blocking, sChannel may be null
if (sChannel == null) {
// There were no pending connection requests; try again later.
// To be notified of connection requests,
} else {
// Use the socket channel to communicate with the client
}
}
}
}
Usually, on a non-blocking tcp server, first accept, then read, then write,
you need to register the selector in this order to make sense.
Here is a full example of non-blocking io:
TcpChannelTest.java: (a TestNG testing class)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import org.testng.annotations.Test;
/**
* tcp channel test
*
* @author eric
* @date Sep 2, 2012 9:17:40 PM
*/
public class TcpChannelTest {
public String serverHost = "localhost";
public int serverPort = 12345;
private ServerSocketChannel server;
private int clientSerial = 0;
private int clientCount = 5;
// test tcp non-blocking channel,
@Test
public void testTcpNonBlockingChanne() throws IOException, InterruptedException {
// start server
startServerNonBlocking();
Thread.sleep(500); // wait server to be ready, before start client,
// start clients
for (int i = 0; i < clientCount; i++) {
startClientOnce();
}
// shutdown server,
Thread.sleep(500); // wait client to be handled,
shutdownServer();
}
// start non-blocking server,
private void startServerNonBlocking() throws IOException {
new Thread(new Runnable() {
@Override
public void run() {
try {
server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(serverHost, serverPort)); // bind,
server.configureBlocking(false); // non-blocking mode,
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
// process each ready key...
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel client = server.accept();
System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), "accept connection");
client.configureBlocking(false);
// prepare for read,
client.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// read
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer inBuf = ByteBuffer.allocate(1024);
while (client.read(inBuf) > 0) {
System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
}
// prepare for write,
client.register(selector, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
String response = "hi - from non-blocking server";
byte[] bs = response.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.wrap(bs);
client.write(buffer);
// switch to read, and disable write,
client.register(selector, SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}, "t-server-threads").start();
}
// close server,
private void shutdownServer() {
try {
if (server != null) {
server.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* <p>
* tcp client - via channel,
* </p>
* <p>
* It send once request.
* </p>
*
* @throws IOException
*/
private void startClientOnce() throws IOException {
// start client in a new thread
new Thread(new Runnable() {
@Override
public void run() {
try {
SocketChannel client = SocketChannel.open(new InetSocketAddress(serverHost, serverPort));
// write
String request = "hello - from client [" + Thread.currentThread().getName() + "}";
byte[] bs = request.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer = ByteBuffer.wrap(bs);
while (buffer.hasRemaining()) {
client.write(buffer);
}
// read
ByteBuffer inBuf = ByteBuffer.allocate(1024);
while (client.read(inBuf) > 0) {
System.out.printf("[%s]:\t%s\n", Thread.currentThread().getName(), new String(inBuf.array(), StandardCharsets.UTF_8));
}
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}, "t-channelClient-" + clientSerial++).start();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With