Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing keep-alive messages in Netty using WriteTimeoutHandler

Tags:

java

netty

I am using Netty 3.2.7. I am trying to write functionality in my client such that if no messages are written after a certain amount of time (say, 30 seconds), a "keep-alive" message is sent to the server.

After some digging, I found that WriteTimeoutHandler should enable me to do this. I found this explanation here: https://issues.jboss.org/browse/NETTY-79.

The example given in the Netty documentation is:

public ChannelPipeline getPipeline() {
     // An example configuration that implements 30-second write timeout:
     return Channels.pipeline(
         new WriteTimeoutHandler(timer, 30), // timer must be shared.
         new MyHandler());
 }

In my test client, I have done just this. In MyHandler, I also overrided the exceptionCaught() method:

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    if (e.getCause() instanceof WriteTimeoutException) {
        log.info("Client sending keep alive!");
        ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length());
        keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes());
        Channels.write(ctx, Channels.future(e.getChannel()), keepAlive);
    }
}

No matter what duration the client does not write anything to the channel, the exceptionCaught() method I have overridden is never called.

Looking at the source of WriteTimeoutHandler, its writeRequested() implementation is:

public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
        throws Exception {

    long timeoutMillis = getTimeoutMillis(e);
    if (timeoutMillis > 0) {
        // Set timeout only when getTimeoutMillis() returns a positive value.
        ChannelFuture future = e.getFuture();
        final Timeout timeout = timer.newTimeout(
                new WriteTimeoutTask(ctx, future),
                timeoutMillis, TimeUnit.MILLISECONDS);

        future.addListener(new TimeoutCanceller(timeout));
    }

    super.writeRequested(ctx, e);
}

Here, it seems that this implementation says, "When a write is requested, make a new timeout. When the write succeeds, cancel the timeout."

Using a debugger, it does seem that this is what is happening. As soon as the write completes, the timeout is cancelled. This is not the behavior I want. The behavior I want is: "If the client has not written any information to the channel for 30 seconds, throw a WriteTimeoutException."

So, is this not what WriteTimeoutHandler is for? This is how I interpreted it from what I've read online, but the implementation does not seem to work this way. Am I using it wrong? Should I use something else? In our Mina version of the same client I am trying to rewrite, I see that the sessionIdle() method is overridden to achieve the behavior I want, but this method is not available in Netty.

like image 938
ImmuneEntity Avatar asked Feb 01 '12 19:02

ImmuneEntity


2 Answers

For Netty 4.0 and newer, you should extend ChannelDuplexHandler like in example from IdleStateHandler documentation :

 // An example that sends a ping message when there is no outbound traffic
 // for 30 seconds.  The connection is closed when there is no inbound traffic
 // for 60 seconds.

 public class MyChannelInitializer extends ChannelInitializer<Channel> {
     @Override
     public void initChannel(Channel channel) {
         channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
         channel.pipeline().addLast("myHandler", new MyHandler());
     }
 }

 // Handler should handle the IdleStateEvent triggered by IdleStateHandler.
 public class MyHandler extends ChannelDuplexHandler {
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent e = (IdleStateEvent) evt;
             if (e.state() == IdleState.READER_IDLE) {
                 ctx.close();
             } else if (e.state() == IdleState.WRITER_IDLE) {
                 ctx.writeAndFlush(new PingMessage());
             }
         }
     }
 }
like image 200
user11153 Avatar answered Oct 31 '22 12:10

user11153


I would suggest to add the IdleStateHandler and then add your custom implementation of IdleStateAwareUpstreamHandler which can react on the idle state. This works out very well for me on many different projects.

The javadocs list the following example, that you could use as the base of your implementation:

public class MyPipelineFactory implements ChannelPipelineFactory {

    private final Timer timer;
    private final ChannelHandler idleStateHandler;

    public MyPipelineFactory(Timer timer) {
        this.timer = timer;
        this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0);
        // timer must be shared.
    }

    public ChannelPipeline getPipeline() {
        return Channels.pipeline(
            idleStateHandler,
            new MyHandler());
    }
}

// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends IdleStateAwareChannelHandler {

    @Override
    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
        if (e.getState() == IdleState.READER_IDLE) {
            e.getChannel().close();
        } else if (e.getState() == IdleState.WRITER_IDLE) {
            e.getChannel().write(new PingMessage());
        }
    }
}

ServerBootstrap bootstrap = ...;
Timer timer = new HashedWheelTimer();
...
bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
...
like image 41
Norman Maurer Avatar answered Oct 31 '22 13:10

Norman Maurer