Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ QueueingConsumer possible memory leak

I have the following code to declare a queue:

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

and the following to get the next Delivery object and process it:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

The deserialise code. As you can see I'm using Kryo:

public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

If I run this with a queue containing a large number of objects, after approximatelly 2.7 million objects I get an out of memory exception. I found this originally by running it over night with data going in from JMeter at a rate ~90/s which at first it is consuming without any trouble, but in the morning I noticed a large number in RabbitMQ and an out of memory exception on the consumer. I ran it up again and used the Eclipse Memory Analyzer to determine where this memory was being used. From this I can see that the java.util.concurrent.LinkedBlockingQueue that is referenced by com.rabbitmq.client.QueueingConsumer is growing and growing until it runs out of memory.

Do I need to do anything to tell Rabbit to release resources?

I could increase the heap size but I'm concerned that this is just a short term fix and there might be something in my code that could bite me with a memory leak a few months into production deployment.

like image 954
Arthur Avatar asked Oct 02 '12 09:10

Arthur


People also ask

How much RAM does RabbitMQ use?

The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM).

Does RabbitMQ use memory?

Overview. During operation, RabbitMQ nodes will consume varying amount of memory and disk space based on the workload.

What is high watermark in RabbitMQ?

A high watermark event will occur when the memory used exceeds the vm_memory_high_watermark setting. In this example, server002 has exceeded the high watermark setting. The main RabbitMQ log file (such as /var/log/rabbitmq/[email protected] on Linux) should contain something like this.


1 Answers

My mistake was that I was setting my channel to auto ack. This mean that every message from Rabbit was getting ack'd (acknologed as being recieved). I have fixed (and tested) this by decalaring the channel to not auto-ack: channel.basicConsume(getQueueName(), false,consumer); and after I process queue, I then ack the message: consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);.

This is what my queue decleration now looks like:

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), false,consumer);

and the following to process the queue:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());
            process(queue);
            consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        } catch (IOException e) {
            logger.error("Could not ack message: {}",e);
            break;
        }
    }

I can now see in the RabbitMQ Management screen that the messages are being delivered at a very high rate but that they are not being ack'd at that rate. If I then kill my consumer, within about 30s all of those non-ack'd messages are moved back to the Ready queue. One of the improvements that I will make is to set the basicQos value: channel.basicQos(10); so that there aren't too many messages delivered but non-ack'd. This is desirable because it means that I can fire up another consumer onto the same queue and start processing the queue rather than it all ending up in memory non-ack'd and not available to other consumers.

like image 115
Arthur Avatar answered Sep 20 '22 12:09

Arthur