Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute business logic handler in a separate thread pool using netty

I have a handler that needs to execute some business logic and I want that to be executed in a separate thread pool to not block the io event loop. I have added DefaultEventExecutorGroup into the pipeline as specified in http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html javadoc and http://netty.io/wiki/new-and-noteworthy-in-4.0.html#no-more-executionhandler---its-in-the-core wiki:

ch.pipeline().addLast(new DefaultEventExecutorGroup(10), new ServerHandler());

Just for testing purposes my ServerHandler just puts the current thread to sleep for 5 seconds:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {
    System.out.println("Starting.");

    try {
        Thread.currentThread().sleep(5000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    System.out.println("Finished.");        
}

But apparently the business logic is still executed synchronously:

Starting.
Finished.
Starting.
Finished.
Starting.
Finished.

What am I missing?

like image 623
A-Z Avatar asked Jun 06 '17 19:06

A-Z


1 Answers

In case your goal is not to block IO event loop - you did it right. But due to netty specific, your handler will be always attached to the same thread of EventExecutorGroup and thus behavior you described above is expected.

In case you want to execute blocking operation in parallel as soon as it arrives you need to use the another way - separate ThreadPoolExecutor. Like this:

ch.pipeline().addLast(new ServerHandler(blockingThreadPool));

where blockingThreadPool is regular ThreadPoolExecutor.

For example:

ExecutorService blockingThreadPool = Executors.newFixedThreadPool(10);

Now, within your logic handler you can submit blocking tasks to this executor like this:

protected void channelRead0(ChannelHandlerContext ctx, Command cmd) throws Exception {

    blockingIOProcessor.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Starting.");

            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println("Finished.");
        }
    });

}

You can also pass context to this runnable in order to return the response back when processing is finished if needed.

like image 92
Dmitriy Dumanskiy Avatar answered Sep 19 '22 21:09

Dmitriy Dumanskiy