Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly synchronize concurrent reads and writes on a AsynchronousSocketChannel

I am trying to implement a single request/response on a AsynchronousSocketChannel in a vert.x worker verticle using CompletionHandler not Futures. From the vert.x documentation:

"Worker verticles are never executed concurrently by more than one thread."

So here is my code (not sure I got the socket handling 100% right - please comment):

    // ommitted: asynchronousSocketChannel.open, connect ...

    eventBus.registerHandler(address, new Handler<Message<JsonObject>>() {
        @Override
        public void handle(final Message<JsonObject> event) {
            final ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(2048);
            final ByteBuffer sendingBuffer = ByteBuffer.wrap("Foo".getBytes());

            asynchronousSocketChannel.write(sendingBuffer, 0L, new CompletionHandler<Integer, Long>() {
                public void completed(final Integer result, final Long attachment) {
                    if (sendingBuffer.hasRemaining()) {
                        long newFilePosition = attachment + result;
                        asynchronousSocketChannel.write(sendingBuffer, newFilePosition, this);
                    }

                    asynchronousSocketChannel.read(receivingBuffer, 0L, new CompletionHandler<Integer, Long>() {
                        CharBuffer charBuffer = null;
                        final Charset charset = Charset.defaultCharset();
                        final CharsetDecoder decoder = charset.newDecoder();

                        public void completed(final Integer result, final Long attachment) {
                            if (result > 0) {
                                long p = attachment + result;
                                asynchronousSocketChannel.read(receivingBuffer, p, this);
                            }

                            receivingBuffer.flip();

                            try {
                                charBuffer = decoder.decode(receivingBuffer);
                                event.reply(charBuffer.toString()); // pseudo code
                            } catch (CharacterCodingException e) { }


                        }

                        public void failed(final Throwable exc, final Long attachment) { }
                    });
                }

                public void failed(final Throwable exc, final Long attachment) { }
            });
        }
    });

I am hitting a lot of ReadPendingException's and WritePendingException's during load testing which seems a bit strange if there is really only one thread at a time in the handle method. How can it be that a read or a write has not fully completed if there is only 1 thread working with the AsynchronousSocketChannel at a time?

like image 621
reikje Avatar asked Nov 13 '22 22:11

reikje


1 Answers

Handlers from AsynchronousSocketChannel are executed on their own AsynchronousChannelGroup which is a derivative of ExecutorService. Unless you make special efforts, that handlers are executed in parallel with the code which started I/O operation.

To execute I/O completion handler within a verticle, you have to make and register a handler from that verticle which does what AsynchronousSocketChannel's handler do now.

The AsynchronousSocketChannel's handler should only pack its arguments (result and attachment) in a message and sent that message to the event bus.

like image 123
Alexei Kaigorodov Avatar answered Nov 15 '22 12:11

Alexei Kaigorodov