I try to create a proxy and then pass all the trafic other handlers in Netty. I understand I should manage the references to ByteBuf but I cannot understand how to do it. My example and the exception is below.
Initializer:
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
private final String remoteHost;
private final int remotePort;
public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = sslCtx;
}
public HexDumpProxyInitializer(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.sslCtx = null;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
p.addLast(new InboundPrinterHandler());
}
}
HexDumpProxyFrontendHandler
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
InboundPrinterHandler
public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bb = null;
bb = (ByteBuf) msg;
System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
System.out.println("\n\n\n");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
}
The Exception
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
if (outboundChannel.isActive()) { outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() { // Snip } ctx.fireChannelRead(msg);
After you pass away the ByteBuf to the other channel, it is the other channels responsibility to decrement the refcount again. Because the other channel has now decremented the the refcount, it is now unuseable.
The best way to solve this is manually incrementing the value before you pass the traffic to the other channel using .retain()
:
outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() {
// Your remaining code
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With