Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ with NodeJS - using amqplib to get message count

How do I get the number of messages currently en-queued?

My code is basically the following:

function readQueue() {
    var open = require('amqplib').connect(config.rabbitServer);

    open.then(function (conn) {
        var ok = conn.createChannel();
        ok = ok.then(function (ch) {
            ch.prefetch(config.bulkSize);

            setInterval(function () {
                handleMessages();
            }, config.bulkInterval);

            ch.assertQueue(config.inputQueue);
            ch.consume(config.inputQueue, function (msg) {
                if (msg !== null) {
                    pendingMessages.push(msg);
                }
            });
        });
        return ok;
    }).then(null, console.warn);
}

I found nothing in the documentation or while debugging, and I did see a different library that allows this, so wondering if amqplib supports this as well.

like image 203
AlexD Avatar asked Feb 08 '23 10:02

AlexD


2 Answers

You can get the queue-length with amqplib.

In my case the queue has the feature 'durable:true'. You have to pass it as an option.

var amqp = require('amqplib/callback_api');

amqp.connect(amqp_url, function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'task2_queue';

    ch.assertQueue(q, {durable: true}, function(err, ok) {
      console.log(ok);
    });
  });
});

It will return an object like this:

{ queue: 'task2_queue', messageCount: 34, consumerCount: 2 }

For more information: https://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue

like image 154
Deniz Husaj Avatar answered Feb 12 '23 09:02

Deniz Husaj


I think the assertQueue method call will return an object that contains the current message count. I don't remember the exact property name off-hand, but it should be in there.

The real trick, though, is that this number will never be updated once you call assertQueue. The only way to get an updated message count is to call assertQueue again. This can have some performance implications if you're checking it too frequently.

like image 23
Derick Bailey Avatar answered Feb 12 '23 09:02

Derick Bailey