Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dead Lettered Message Not Being Consumed in RabbitMQ and Node Using AMQP.Node

I want to receive a message after a certain amount of time in one of my workers. I decided to go with Node and RabbitMQ after discovering so-called dead letter exchanges.

The message seems to get send to the queue in DeadExchange, but the consumer is never receiving the message after the elapsed time in the WorkQueue in the WorkExchange. Either the bindQueue is off, or the dead-letter'ing doesn't work?

I've tried a lot of different values now. Can someone please point out what I'm missing?

var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', '');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to a random (unique) queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-message-ttl': 2000,
                    'x-expires': 10000
                }
            })
        }).then(function(ok) {
            console.log('Sending delayed message');

            return ch.sendToQueue(ok.queue, new Buffer(':)'));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});

I'm using amqp.node (https://github.com/squaremo/amqp.node) which is amqplib in npm. Although node-amqp (https://github.com/postwait/node-amqp) seems to be so much more popular, it doesn't implement the full protocol and there are quite some outstanding issues regarding reconnecting.

dev.rabbitmq.com is running RabbitMQ 3.1.3.

like image 552
martijndeh Avatar asked Jul 31 '13 11:07

martijndeh


People also ask

How do I read messages from dead-letter queue RabbitMQ?

You need to configure a "dead letter queue" to handle messages that have been rejected or undelivered. Using the RabbitMQ Client library, you can bind a consumer to that configured queue and retrieve the messages from it. From there you decide in code what you want to do to reprocess/reject them completely.

How does RabbitMQ dead-letter queue work?

In RabbitMQ messages can become dead letters when one of the following occurs. The message is rejected or negatively acknowledged by the consumer and the consumer tells RabbitMQ not to requeue the message or the message expires due to a time-to-live option specified on the queue or on the message.

How do I send a message to a dead-letter queue?

To specify a dead-letter queue, you can use the console or the AWS SDK for Java. You must do this for each queue that sends messages to a dead-letter queue. Multiple queues of the same type can target a single dead-letter queue.

How do I create a dead-letter queue in RabbitMQ?

To set the dead letter exchange for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host: channel. exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.


1 Answers

This is a working code.When a message spends more than ttl in DeadExchange, it is pushed to WorkExchange. The key to success is defining the right routing key. The exchange-queue to which you wish to send post ttl, should be bounded with a routing key(note: not default), and 'x-dead-letter-routing-key' attributes value should match that route-key.

var amqp = require('amqplib');
var url = 'amqp://localhost';

amqp.connect(url).then(function(conn) {
    //Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
    return conn.createChannel().then(function(ch) {
        return ch.assertExchange('WorkExchange', 'direct').then(function() {
            return ch.assertQueue('WorkQueue', {
                autoDelete: false,
                durable: true
            })
        }).then(function() {
            return ch.bindQueue('WorkQueue', 'WorkExchange', 'rk1');
        }).then(function() {
            console.log('Waiting for consume.');

            return ch.consume('WorkQueue', function(msg) {
                console.log('Received message.');
                console.log(msg.content.toString());
                ch.ack(msg);
            });
        });
    })
}).then(function() {
    //Now send a test message to DeadExchange to DEQ queue.
    return amqp.connect(url).then(function(conn) {
        return conn.createChannel();
    }).then(function(ch) {
        return ch.assertExchange('DeadExchange', 'direct').then(function() {
            return ch.assertQueue('DEQ', {
                arguments: {
                    'x-dead-letter-exchange': 'WorkExchange',
                    'x-dead-letter-routing-key': 'rk1',
                    'x-message-ttl': 15000,
                    'x-expires': 100000
                }
            })
        }).then(function() {
            return ch.bindQueue('DEQ', 'DeadExchange', '');
        }).then(function() {
            console.log('Sending delayed message');

            return ch.publish('DeadExchange', '', new Buffer("Over the Hills and Far Away!"));
        });
    })
}).then(null, function(error) {
    console.log('error\'ed')
    console.log(error);
    console.log(error.stack);
});
like image 134
Abhinav K Tripathi Avatar answered Sep 23 '22 22:09

Abhinav K Tripathi