Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java: Asynchronous I/O channel for reading and writing lines

I have an application that synchronously reads and writes lines of text using a BufferedReader and a PrintStream wrapping the InputStream and OutputStream of a java.net.Socket object. So, I can just use the methods BufferedReader.readLine() and PrintStream.println() and let the Java library split the input into lines and format the output for me.

Now I want to replace this synchronous IO with asynchronous IO. So I have been looking into AsynchronousSocketChannel which allows to read and write bytes asynchronously. Now, I would like to have wrapper classes so that I can asynchronously read / write lines using strings.

I cannot find such wrapper classes in the Java library. Before I write my own implementation, I wanted to ask if there are any other libraries that allow to wrap AsynchronousSocketChannel and provide asynchronous text IO.

like image 218
giorgio-b Avatar asked May 31 '16 12:05

giorgio-b


Video Answer


1 Answers

You can do something like this

public void nioAsyncParse(AsynchronousSocketChannel channel, final int bufferSize) throws IOException, ParseException, InterruptedException {
    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
    BufferConsumer consumer = new BufferConsumer(byteBuffer, bufferSize);
    channel.read(consumer.buffer(), 0l, channel, consumer);
}


class BufferConsumer implements CompletionHandler<Integer, AsynchronousSocketChannel> {

        private ByteBuffer bytes;
        private StringBuffer chars;
        private int limit;
        private long position;
        private DateFormat frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        public BufferConsumer(ByteBuffer byteBuffer, int bufferSize) {
            bytes = byteBuffer;
            chars = new StringBuffer(bufferSize);
            frmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            limit = bufferSize;
            position = 0l;
        }

        public ByteBuffer buffer() {
            return bytes;
        }

        @Override
        public synchronized void completed(Integer result, AsynchronousSocketChannel channel) {

            if (result!=-1) {
                bytes.flip();
                final int len = bytes.limit();
                int i = 0;
                try {
                    for (i = 0; i < len; i++) {
                        byte by = bytes.get();
                        if (by=='\n') {
                            // ***
                            // The code used to process the line goes here
                            // ***
                            chars.setLength(0);
                        }
                        else {
                            chars.append((char) by);
                        }
                    }
                }
                catch (Exception x) {
                    System.out.println("Caught exception " + x.getClass().getName() + " " + x.getMessage() + " i=" + String.valueOf(i) + ", limit=" + String.valueOf(len) + ", position="+String.valueOf(position));
                }

                if (len==limit) {
                    bytes.clear();
                    position += len;
                    channel.read(bytes, position, channel, this);
                }
                else {
                    try {
                        channel.close();
                    }
                    catch (IOException e) { }
                    bytes.clear();
                    buffers.add(bytes);
                }
            }
            else {
                try {
                    channel.close();
                }
                catch (IOException e) { }
                bytes.clear();
                buffers.add(bytes);
            }
        }

        @Override
        public void failed(Throwable e, AsynchronousSocketChannel channel) {
        }
};
like image 111
Serg M Ten Avatar answered Oct 20 '22 00:10

Serg M Ten