Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to keep data with each channel on NIO Server

Tags:

java

nio

I have a Java NIO server which receives data from clients.

When a channel is ready for read i.e key.isReadable() return true read(key) is called to read data.

Currently I am using a single read buffer for all channels and in read() method , I clear the buffer and read into it and then finally put into a byte array , supposing that I will get all data in one shot.

But let's say I do not get complete data in one shot(I have special characters at data ending to detect).

Problem :

So now how to keep this partial data with channel or how to deal with partial read problem ? or globally ?

I read somewhere attachments are not good.

like image 416
cruxion effux Avatar asked Mar 16 '23 16:03

cruxion effux


1 Answers

Take a look at the Reactor pattern. Here is a link to basic implementation by professor Doug Lea:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

The idea is to have single reactor thread which blocks on Selector call. Once there are IO events ready, reactor thread dispatches the events to appropriate handlers. In pdf above, there is inner class Acceptor within Reactor which accepts new connections.

Author uses single handler for read and write events and maintains state of this handler. I prefer to have separate handlers for reads and writes but this is not as easy to work with as with 'state machine'. There can be only one Attachment per event, so some kind of injection is needed to switch read/write handlers.

To maintain state between subsequent read/writes you will have to do couple of things:

  • Introduce custom protocol which tells you when the message is fully read
  • Have timeout or cleanup mechanism for stale connections
  • Maintain client specific sessions

So, you can do something like this:

public class Reactor implements Runnable{

    Selector selector = Selector.open();

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    public Reactor(int port) throws IOException {

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        serverSocketChannel.configureBlocking(false);

        // let Reactor handle new connection events
        registerAcceptor();

    }

    /**
     * Registers Acceptor as handler for new client connections.
     * 
     * @throws ClosedChannelException
     */
    private void registerAcceptor() throws ClosedChannelException {


        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        selectionKey0.attach(new Acceptor());
    }

    @Override
    public void run(){

        while(!Thread.interrupted()){

            startReactorLoop();

        }

    }

    private void startReactorLoop() {

        try {

            // wait for new events for each registered or new clients
            selector.select();

            // get selection keys for pending events
            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();

            while (selectedKeysIterator.hasNext()) {

                // dispatch even to handler for the given key
                dispatch(selectedKeysIterator.next());

                // remove dispatched key from the collection
                selectedKeysIterator.remove();
            }

        } catch (IOException e) {
            // TODO add handling of this exception
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey interestedEvent) {

        if (interestedEvent.attachment() != null) {

            EventHandler handler = (EventHandler) interestedEvent.attachment();

            handler.processEvent();
        }

    }

    private class Acceptor implements EventHandler {

        @Override
        public void processEvent() {

            try {

                SocketChannel clientConnection = serverSocketChannel.accept();

                if (clientConnection != null) {

                    registerChannel(clientConnection);

                }

            } catch (IOException e) {e.printStackTrace();}

        }
    /**
     *  Save Channel - key association - in Map perhaps.
     * This is required for subsequent/partial reads/writes
     */
    private void registerChannel(SocketChannel clientChannel) {


        // notify injection mechanism of new connection (so it can activate Read Handler)
}

Once read event is handled, notify injection mechanism that write handler can be injected.

New instances of read and write handlers are created by the injection mechanism once, when new Connection is available. This injection mechanism switches handlers as needed. Lookup of handlers for each Channel is done from the Map that is filled at the connection Acceptance by the method `registerChannel().

Read and write handlers have ByteBuffer instances, and since each Socket Channel has its own pair of handlers, you can now maintain state between partial reads and writes.

Two tips to improve performance:

  • Try to do first read immediately when connection is accepted. Only if you don't read enough data as defined by header in your custom protocol, register Channel interest for read events.

  • Try to do write first without registering interest for write events and only if you don't write all the data, register interest for write.

This will reduce number of Selector wakeups.

Something like this:

SocketChannel socketChannel;

byte[] outData;

final static int MAX_OUTPUT = 1024;

ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);

// if message was not written fully
if (socketChannel.write(output) < messageSize()) {

// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE); 
        selectionKey.attach(writeHandler);
        selector.wakeup();

}

Finally, there should be timed Task which checks if Connections are still alive/SelectionKeys are canceled. If client breaks TCP connection, server will usually not know of this. As a result, there will be number of Event handlers in memory, bind as Attachments to stale connections which will result with memory leak.

This is the reason why you may say Attachments are not good, but the issue can be dealt with.

To deal with this here are two simple ways:

  • TCP keep alive could be enabled

  • periodic task could check timestamp of last activity on the given Channel. If it is idle for to long, server should terminate connection.

like image 152
John Avatar answered Mar 21 '23 19:03

John