I have the following code to declare a queue:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);
and the following to get the next Delivery object and process it:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
}
}
The deserialise code. As you can see I'm using Kryo:
public T deserialise(byte[] body) {
Kryo kryo= new Kryo();
Input input = new Input(body);
T deserialised = kryo.readObject(input, getQueueClass());
input.close();
return deserialised;
}
If I run this with a queue containing a large number of objects, after approximatelly 2.7 million objects I get an out of memory exception. I found this originally by running it over night with data going in from JMeter at a rate ~90/s which at first it is consuming without any trouble, but in the morning I noticed a large number in RabbitMQ and an out of memory exception on the consumer. I ran it up again and used the Eclipse Memory Analyzer to determine where this memory was being used. From this I can see that the java.util.concurrent.LinkedBlockingQueue that is referenced by com.rabbitmq.client.QueueingConsumer is growing and growing until it runs out of memory.
Do I need to do anything to tell Rabbit to release resources?
I could increase the heap size but I'm concerned that this is just a short term fix and there might be something in my code that could bite me with a memory leak a few months into production deployment.
The default memory threshold is set to 40% of installed RAM. Note that this does not prevent the RabbitMQ server from using more than 40%, it is merely the point at which publishers are throttled. Erlang's garbage collector can, in the worst case, cause double the amount of memory to be used (by default, 80% of RAM).
Overview. During operation, RabbitMQ nodes will consume varying amount of memory and disk space based on the workload.
A high watermark event will occur when the memory used exceeds the vm_memory_high_watermark setting. In this example, server002 has exceeded the high watermark setting. The main RabbitMQ log file (such as /var/log/rabbitmq/[email protected] on Linux) should contain something like this.
My mistake was that I was setting my channel to auto ack. This mean that every message from Rabbit was getting ack'd (acknologed as being recieved). I have fixed (and tested) this by decalaring the channel to not auto-ack: channel.basicConsume(getQueueName(), false,consumer);
and after I process queue, I then ack the message: consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
.
This is what my queue decleration now looks like:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), false,consumer);
and the following to process the queue:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
} catch (IOException e) {
logger.error("Could not ack message: {}",e);
break;
}
}
I can now see in the RabbitMQ Management screen that the messages are being delivered at a very high rate but that they are not being ack'd at that rate. If I then kill my consumer, within about 30s all of those non-ack'd messages are moved back to the Ready queue. One of the improvements that I will make is to set the basicQos value: channel.basicQos(10);
so that there aren't too many messages delivered but non-ack'd. This is desirable because it means that I can fire up another consumer onto the same queue and start processing the queue rather than it all ending up in memory non-ack'd and not available to other consumers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With