Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Netty messageReceived timeout

I need my messageReceived (or channelRead0 for netty 4.0) method in my ChannelHandler to timeout after a certain time threshold. I have tried Read/WriteTimeoutHandlers, but couldn't generate an exception when my messageReceived processing time exceeded the timeout. Here's what I tried:

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpContentCompressor());
        p.addLast("readTimeoutHandler", new ReadTimeoutHandler(1));
        //p.addLast("idleTimeoutHandler", new IdleStateHandler(1, 1, 1));
        p.addLast(new HttpRequestHandler());
      }
  }



public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
  public static class Call implements Callable<Boolean> {
    public Boolean call() {
        for(long i = 0; i<100000000;i++){
            for(long j = 0; j<100;j++){

            }
        }
        return true;        
    }
 }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    System.out.println("** Exception caught **");
    if (cause instanceof ReadTimeoutException) {
       System.out.println("*** Request timed out ***");
    } 
    else if (cause instanceof WriteTimeoutException) {
           System.out.println("*** Request timed out on write ***");
        } 
    cause.printStackTrace();
    ctx.close();
  }

   @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        FullHttpRequest request = this.request = (FullHttpRequest) msg;
        /*Callable<Boolean> callable = new Call();

    ScheduledExecutorService scheduler =
             Executors.newScheduledThreadPool(1);

    ScheduledFuture<Boolean> handle = scheduler.schedule(callable, 4, TimeUnit.SECONDS);
    boolean v  = handle.get();*/
    for(long i = 0; i<100000000;i++){
        for(long j = 0; j<100;j++){

        }
    }
    System.out.println("Wait done");

        try{
            CharSequence content = appController.handleRequest(
                    url,
                    ctx.channel().remoteAddress().toString(),
                    parseURL(url), reqBody);
            if(content!=null){
                writeResponse(ctx, HttpResponseStatus.OK, content);
            }
        }catch(AppRuntimeException e){
            CharSequence content = e.getMessage(); 
            if(content != null){
                OptHttpStatus status = e.getOptHttpStatus();
                writeResponse(ctx, HttpResponseStatus.valueOf(status.getCode()), content);
            }
        }

   private void writeResponse(ChannelHandlerContext ctx, HttpResponseStatus status, CharSequence content) {
    // Decide whether to close the connection or not.
    boolean keepAlive = HttpUtil.isKeepAlive(request);

    // Build the response object.
    FullHttpResponse response = new DefaultFullHttpResponse(
            HTTP_1_1, 
            status,
            Unpooled.copiedBuffer(content, CharsetUtil.UTF_8));
            //Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); 
    response.headers().set("Access-Control-Allow-Origin", "*");

    if (keepAlive) {
        //Add 'Content-Length' header only for a keep-alive connection.
        response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        // Add keep alive header as per:
        // - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }
    // Write the response.
    ChannelFuture ch = ctx.writeAndFlush(response);

    if(!keepAlive){
        ch.addListener(ChannelFutureListener.CLOSE);
    }
}
    }

I added a redundant for loop for simulating wait in channelRead0 method to simulate long processing time. But a timeout exception was not generated. I also tried scheduling the wait, but didn't get a timeout exception Can you suggest any solutions?

like image 999
user1051577 Avatar asked Oct 29 '22 18:10

user1051577


1 Answers

There are two problems with what you are trying to do, one is caused by sleeping the thread instead of using an async delay through scheduling, and the other is you need to go back to using ReadTimeoutHandler.

Don't sleep or block

Instead of Thread.sleep why don't you try scheduling your work with a delay. With Netty your timeout would happen in the same thread you sent to sleep so you still end up writing a response before the timeout check can happen. If you schedule the delay instead then the thread is free to detect the timeout and trigger the exception.

Keep in mind that netty uses one IO thread for many channels, so you shouldn't be doing any blocking/synchronous work in that thread. Thread.sleep, busy loop, synchronous calls, blocking calls (such as Future.get()) should not be done in the IO thread, as this will impact performance of other channels.

You can use the context to get ahold of an executor to schedule your delayed work with.

public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
    ctx.executor().schedule(new Runnable() {
        @Override
        public void run() {
            // Put your try/catch in here
        }
    }, 5000, TimeUnit.MILLISECONDS);
}

If You MUST Make A Blocking Call

If you can't avoid making a blocking call or doing some intense processing, then use a different EventExecutorGroup when adding your processing handler to allow that work to be done asynchronously from the IO Worker thread. Be sure to provide it with enough threads for your expected workload and number of connections.

Example code below should work with your Thread.sleep or busy loops. Just be sure to define/replace OFFLOAD_THREADS with a number that meets your needs.

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    private final EventExecutorGroup executors = new DefaultEventExecutorGroup(OFFLOAD_THREADS);
    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new HttpRequestDecoder());
        p.addLast(new HttpResponseEncoder());
        p.addLast(new HttpObjectAggregator(1048576));
        p.addLast(new HttpContentCompressor());
        p.addLast("idleTimeoutHandler", new IdleStateHandler(0, 1, 0));
        p.addLast(executors, new HttpRequestHandler());
    }
}

Use IdleStateHandler

The WriteTimeoutHandler is meant to timeout if a write takes too long. It won't even start timing you until you start your first write. It looks like you are trying to cause a delay before writing even starts, so the WriteTimeoutHandler won't trigger for you, even if you stop using sleep as suggested above.

If you really want to timeout on how long it takes you to start writing then you should use IdleStateHandler and handle the user event it triggers. Unlike WriteTimeoutHandler, the IdleStateHandler will start counting when the channel becomes active instead of waiting for a write to start, so it will trigger if your processing takes too long (but only if you do your processing async).

Make sure you catch and react to the user event when using IdleStateHandler

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
        // Handle the event here
    }
    else {
        super.userEventTriggered(ctx, evt);
    }
}
like image 176
Chris O'Toole Avatar answered Nov 15 '22 06:11

Chris O'Toole