Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

io.netty.util.IllegalReferenceCountException: refCnt: 0 in Netty

Tags:

java

netty

I try to create a proxy and then pass all the trafic other handlers in Netty. I understand I should manage the references to ByteBuf but I cannot understand how to do it. My example and the exception is below.

Initializer:

public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    private final String remoteHost;

    private final int remotePort;

    public HexDumpProxyInitializer(SslContext sslCtx, String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = sslCtx;
    }

    public HexDumpProxyInitializer(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.sslCtx = null;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }

        p.addLast(new HexDumpProxyFrontendHandler(remoteHost, remotePort));
        p.addLast(new InboundPrinterHandler());
    }
}

HexDumpProxyFrontendHandler

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    private Channel outboundChannel;

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
         .channel(ctx.channel().getClass())
         .handler(new HexDumpProxyBackendHandler(inboundChannel))
         .option(ChannelOption.AUTO_READ, false);
        ChannelFuture f = b.connect(remoteHost, remotePort);
        outboundChannel = f.channel();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has failed.
                    inboundChannel.close();
                }
            }
        });
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

   static void closeOnFlush(Channel ch) {
       if (ch.isActive()) {
           ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
       }
   }
}

InboundPrinterHandler

public class InboundPrinterHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf bb = null;
        bb = (ByteBuf) msg;
        System.out.println("INBOUND:\n\n"+bb.toString(Charset.defaultCharset()));
        System.out.println("\n\n\n");
    }


    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    }


}

The Exception

io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.internalNioBuffer(PooledUnsafeDirectByteBuf.java:331)
    at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:614)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1213)
    at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1208)
    at com.netas.sv.proxy.InboundPrinterHandler.channelRead(InboundPrinterHandler.java:16)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at com.netas.sv.proxy.HexDumpProxyFrontendHandler.channelRead(HexDumpProxyFrontendHandler.java:67)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)
like image 749
Erkan Erol Avatar asked Dec 18 '22 10:12

Erkan Erol


1 Answers

    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
        // Snip
    }
    ctx.fireChannelRead(msg);

After you pass away the ByteBuf to the other channel, it is the other channels responsibility to decrement the refcount again. Because the other channel has now decremented the the refcount, it is now unuseable.

The best way to solve this is manually incrementing the value before you pass the traffic to the other channel using .retain():

outboundChannel.writeAndFlush(msg.retain()).addListener(new ChannelFutureListener() {
// Your remaining code
like image 63
Ferrybig Avatar answered Dec 29 '22 00:12

Ferrybig