Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Send multiple asynchonous requests on a Netty client

first, let me explain the context :

I've got to create a client which will send many HTTP requests to download images. These requests has to be asynchronous because as soon as an image is completed it'll be added to a queue and then print to screen. Because images can be large and responses chunked, my handler have to aggregate it into a buffer.

So I follow the Netty examples codes (HTTP spoon example).

Currently, I've got three static Map to store for each channels the channel ID and the buffer/chunk boolean/my final object.

private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();

After that, I create my bootstrap client and counter to countDown the number of pending requests. The final queue and the counter are passed to my Handler for when the response image is complet.

    final ClientBootstrap bootstrap = new ClientBootstrap(
            new NioClientSocketChannelFactory(
            Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool()));
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("reuseAddress", true);
    bootstrap.setOption("connectTimeoutMillis", 30000);


    final CountDownLatch latch = new CountDownLatch(downloadList.size()) {

        @Override
        public void countDown() {
            super.countDown();
            if (getCount() <= 0) {
                try {
                    queue.put(END_OF_QUEUE);
                    bootstrap.releaseExternalResources();
                } catch (InterruptedException ex) {
                    LOGGER.log(Level.WARNING, ex.getMessage(), ex);
                }
            }
        }
    };
    bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
    bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));

After that I create a Channel for each image to download and when the channel is connected, the request will be created and send. The host and port have already been extracted before.

for (final ImagePack pack : downloadList) {

        final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        future.addListener(new ChannelFutureListener() {

            public void operationComplete(ChannelFuture cf) throws Exception {

                final Channel channel = future.getChannel();

                PACK_MAP.put(channel.getId(), pack);

                final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
                request.setHeader(HttpHeaders.Names.HOST, host);
                request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
                request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);

                if (channel.isWritable()) {
                    channel.write(request);
                }
            }
        });
    }

Now, this is my ChannelHandler which is an inner class that extend SimpleChannelUpstreamHandler. When the channel is connected, a new entry in BUFFER_MAP and in CHUNKS_MAP is created. The BUFFER_MAP contains all the images buffers used by the handler to aggregate image chunks from channels and CHUNKS_MAP contains response chunked boolean. When the response is complete, the image InputSteam is added to the queue, the latch count down and the channel closed.

private class TileClientHandler extends SimpleChannelUpstreamHandler {

    private CancellableQueue<Object> queue;
    private CountDownLatch latch;

    public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
        this.queue = queue;
        this.latch = latch;
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
            BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
        }
        if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
            CHUNKS_MAP.put(ctx.getChannel().getId(), false);
        }
    }

    @Override
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        super.writeComplete(ctx, e);
        if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
            BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
        }
        if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
            CHUNKS_MAP.put(ctx.getChannel().getId(), false);
        }
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        final Integer channelID = ctx.getChannel().getId();
        if (!CHUNKS_MAP.get(channelID)) {
            final HttpResponse response = (HttpResponse) e.getMessage();

            if (response.isChunked()) {
                CHUNKS_MAP.put(channelID, true);

            } else {
                final ChannelBuffer content = response.getContent();
                if (content.readable()) {
                    final ChannelBuffer buf = BUFFER_MAP.get(channelID);
                    buf.writeBytes(content);
                    BUFFER_MAP.put(channelID, buf);
                    messageCompleted(e);

                }
            }
        } else {
            final HttpChunk chunk = (HttpChunk) e.getMessage();
            if (chunk.isLast()) {
                CHUNKS_MAP.put(channelID, false);
                messageCompleted(e);
            } else {
                final ChannelBuffer buf = BUFFER_MAP.get(channelID);
                buf.writeBytes(chunk.getContent());
                BUFFER_MAP.put(channelID, buf);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        e.getCause().printStackTrace();
        latch.countDown();
        e.getChannel().close();
    }

    private void messageCompleted(MessageEvent e) {
        final Integer channelID = e.getChannel().getId();
        if (queue.isCancelled()) {
            return;
        }

        try {
            final ImagePack p = PACK_MAP.get(channelID);
            final ChannelBuffer b = BUFFER_MAP.get(channelID);

            p.setBuffer(new ByteArrayInputStream(b.array()));
            queue.put(p.getTile());
        } catch (Exception ex) {
            LOGGER.log(Level.WARNING, ex.getMessage(), ex);
        }
        latch.countDown();
        e.getChannel().close();
    }
}

My problem is, when I execute this code, I've got these exceptions :

 java.lang.IllegalArgumentException: invalid version format: 3!}@
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

java.lang.IllegalArgumentException: invalid version format: 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
    at org.jboss.netty.channel.Channels.close(Channels.java:720)
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: 
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself.  Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
    at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
    at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
    at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
    at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
    at org.jboss.netty.channel.Channels.close(Channels.java:720)
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

And also some NPE appears some times.

java.lang.NullPointerException
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)

All these code works fine for one request but some weird stuff append on buffers when many requests where send.

Any ideas what I'm missing here? Thanks.

In my first version, I duplicate bootstrap/handler for each requested images, it works fine but not very optimized.

like image 862
qboileau Avatar asked Mar 22 '12 15:03

qboileau


1 Answers

The problem is that you're sharing a single HttpClientCodec between all your channels. The default pipeline specified in the bootstrap is cloned for all channels, so each channel sees the same instance of each handler. The http codecs are stateful so you're seeing the effects of different responses getting mixed together.

The easiest solution is to pass a ChannelPipelineFactory to the bootstrap. This will be called for each new channel and you can create a pipeline with new instances of HttpClientCodec. There's nothing to stop you using the same instance of TileClientHandler for every pipeline you create if that is how it's intended to work.

I'm curious though. Given that you're making each request concurrently, wouldn't it be easier to just add HttpChunkAggregator upstream of HttpClientCodec and let Netty aggregate all the chunks into a single HttpResponse. Then you just grab the reassembled content from there?

like image 72
johnstlr Avatar answered Oct 14 '22 23:10

johnstlr