Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ: how to limit consuming rate

I need to limit the rate of consuming messages from rabbitmq queue.

I have found many suggestions, but most of them offer to use prefetch option. But this option doesn't do what I need. Even if I set prefetch to 1 the rate is about 6000 messages/sec. This is too many for consumer.

I need to limit for example about 70 to 200 messages per second. This means consuming one message every 5-14ms. No simultaneous messages.

I'm using Node.JS with amqp.node library.

like image 812
Alex_Crack Avatar asked Mar 24 '15 06:03

Alex_Crack


2 Answers

Implementing a token bucket might help: https://en.wikipedia.org/wiki/Token_bucket

You can write a producer that produces to the "token bucket queue" at a fixed rate with a TTL on the message (maybe expires after a second?) or just set a maximum queue size equal to your rate per second. Consumers that receive a "normal queue" message must also receive a "token bucket queue" message in order to process the message effectively rate limiting the application.

NodeJS + amqplib Example:

var queueName = 'my_token_bucket';
rabbitChannel.assertQueue(queueName, {durable: true, messageTtl: 1000, maxLength: bucket.ratePerSecond});
writeToken();

function writeToken() {
    rabbitChannel.sendToQueue(queueName, new Buffer(new Date().toISOString()), {persistent: true});
    setTimeout(writeToken, 1000 / bucket.ratePerSecond);
}
like image 183
John Culviner Avatar answered Sep 19 '22 21:09

John Culviner


I've already found a solution.

I use module nanotimer from npm for calculation delays.

Then I calculate delay = 1 / [message_per_second] in nanoseconds.

Then I consume message with prefetch = 1

Then I calculate really delay as delay - [processing_message_time]

Then I make timeout = really delay before sending ack for the message

It works perfectly. Thanks to all

like image 35
Alex_Crack Avatar answered Sep 17 '22 21:09

Alex_Crack