I am trying to implement a single request/response on a AsynchronousSocketChannel in a vert.x worker verticle using CompletionHandler not Futures. From the vert.x documentation:
"Worker verticles are never executed concurrently by more than one thread."
So here is my code (not sure I got the socket handling 100% right - please comment):
// ommitted: asynchronousSocketChannel.open, connect ...
eventBus.registerHandler(address, new Handler<Message<JsonObject>>() {
@Override
public void handle(final Message<JsonObject> event) {
final ByteBuffer receivingBuffer = ByteBuffer.allocateDirect(2048);
final ByteBuffer sendingBuffer = ByteBuffer.wrap("Foo".getBytes());
asynchronousSocketChannel.write(sendingBuffer, 0L, new CompletionHandler<Integer, Long>() {
public void completed(final Integer result, final Long attachment) {
if (sendingBuffer.hasRemaining()) {
long newFilePosition = attachment + result;
asynchronousSocketChannel.write(sendingBuffer, newFilePosition, this);
}
asynchronousSocketChannel.read(receivingBuffer, 0L, new CompletionHandler<Integer, Long>() {
CharBuffer charBuffer = null;
final Charset charset = Charset.defaultCharset();
final CharsetDecoder decoder = charset.newDecoder();
public void completed(final Integer result, final Long attachment) {
if (result > 0) {
long p = attachment + result;
asynchronousSocketChannel.read(receivingBuffer, p, this);
}
receivingBuffer.flip();
try {
charBuffer = decoder.decode(receivingBuffer);
event.reply(charBuffer.toString()); // pseudo code
} catch (CharacterCodingException e) { }
}
public void failed(final Throwable exc, final Long attachment) { }
});
}
public void failed(final Throwable exc, final Long attachment) { }
});
}
});
I am hitting a lot of ReadPendingException's and WritePendingException's during load testing which seems a bit strange if there is really only one thread at a time in the handle method. How can it be that a read or a write has not fully completed if there is only 1 thread working with the AsynchronousSocketChannel at a time?
Handlers from AsynchronousSocketChannel are executed on their own AsynchronousChannelGroup which is a derivative of ExecutorService. Unless you make special efforts, that handlers are executed in parallel with the code which started I/O operation.
To execute I/O completion handler within a verticle, you have to make and register a handler from that verticle which does what AsynchronousSocketChannel's handler do now.
The AsynchronousSocketChannel's handler should only pack its arguments (result and attachment) in a message and sent that message to the event bus.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With