Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume just one message from rabbit mq on nodejs

Im using amqp.node library to integrate rabbitmq into my system.

But in consumer i want to process just one message at the time, then acknowledge the message then consume the next message from the queue.

The current code is:

// Consumer
open.then(function(conn) {
  var ok = conn.createChannel();
  ok = ok.then(function(ch) {
    ch.assertQueue(q);
    ch.consume(q, function(msg) {
      if (msg !== null) {
        othermodule.processMessage(msg, function(error, response){
          console.log(msg.content.toString());
          ch.ack(msg);
        });
      }
    });
  });
  return ok;
}).then(null, console.warn);

The ch.consume will process all messages in the channel at one time and the function of the module call it here othermodule will not be executed in the same time line.

I want to wait for the othermodule function to finish before consume the next message in the queue.

like image 476
bitgandtter Avatar asked Feb 26 '15 16:02

bitgandtter


People also ask

How do you consume a message in RabbitMQ?

In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.

Can two consumers consume the same message RabbitMQ?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.


3 Answers

At this moment (2018), I think RabbitMQ team has an option to do that:

https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html

ch.prefetch(1);

In order to defeat that we can use the prefetch method with the value of 1. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

like image 100
Hoang Trinh Avatar answered Oct 27 '22 19:10

Hoang Trinh


Follow up the example here :

https://www.npmjs.com/package/amqplib

// Consumer
function consumer(conn) {
  var ok = conn.createChannel(on_open);
  function on_open(err, ch) {
    if (err != null) bail(err);
    ch.assertQueue(q);
    
     // IMPORTANT
    ch.prefetch(1);

    ch.consume(q, function(msg) {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  }
}

Refs: http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch

like image 42
vanduc1102 Avatar answered Oct 27 '22 19:10

vanduc1102


You need to set a prefetch value as shown in this example:

https://github.com/squaremo/amqp.node/blob/master/examples/tutorials/rpc_server.js#L22

like image 1
old_sound Avatar answered Oct 27 '22 17:10

old_sound