I'm currently playing around with Rabbit-Mq, and am trying to implement a "dead-letter" queue, a queue for failed messages. I've been reading the rabbit documentation: https://www.rabbitmq.com/dlx.html.
and have come up with this example:
internal class Program
{
private const string WorkerExchange = "work.exchange";
private const string RetryExchange = "retry.exchange";
public const string WorkerQueue = "work.queue";
private const string RetryQueue = "retry.queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(WorkerExchange, "direct");
channel.QueueDeclare
(
WorkerQueue, true, false, false,
new Dictionary<string, object>
{
{"x-dead-letter-exchange", RetryExchange},
// I have tried with and without this next key
{"x-dead-letter-routing-key", RetryQueue}
}
);
channel.QueueBind(WorkerQueue, WorkerExchange, string.Empty, null);
channel.ExchangeDeclare(RetryExchange, "direct");
channel.QueueDeclare
(
RetryQueue, true, false, false,
new Dictionary<string, object> {
{ "x-dead-letter-exchange", WorkerExchange },
{ "x-message-ttl", 30000 },
}
);
channel.QueueBind(RetryQueue, RetryExchange, string.Empty, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(1000);
Console.WriteLine("Rejected message");
// also tried channel.BasicNack(ea.DeliveryTag, false, false);
channel.BasicReject(ea.DeliveryTag, false);
};
channel.BasicConsume(WorkerQueue, false, consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
Image of queue when publishing to worker queue:
Image of the retry queue:
I feel as though I'm missing some small details but can't seem to find what they are.
Thanks in advance
To set the dead letter exchange for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host: channel. exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.
To process messages on a dead-letter queue (DLQ), IBM® MQ supplies a default DLQ handler. The handler matches messages on the DLQ against entries in a rules table that you define. Messages can be put on a DLQ by queue managers, message channel agents (MCAs), and applications.
Some messages stored in RabbitMQ queues will expire or be negatively acknowledged by consumers. Instead of silently dropping them, RabbitMQ can be configured to “dead letter” them instead, that is to republish those messages to a special-purpose exchange. Prior to RabbitMQ 3.10 dead lettering has not been safe.
If the dead-letter queue's retention period is 4 days, the message is deleted from the dead-letter queue after 3 days and the ApproximateAgeOfOldestMessage is 3 days. Thus, it is a best practice to always set the retention period of a dead-letter queue to be longer than the retention period of the original queue.
You should define your dead-letter-exchange as fanout
.
There we go: channel.ExchangeDeclare(RetryExchange, "fanout");
If your dead letter exchange is setup as DIRECT you must specify a dead letter routing key. If you just want all your NACKed message to go into a dead letter bucket for later investigation (as I do) then your dead letter exchange should be setup as a FANOUT.
Look at this for more info
Turns out that if a dead letter exchange is a direct
exchange then the queue parameters require a x-dead-letter-routing-key
. Above (in the question) I am using this key in the dictionary to try and route my messages but what I am not doing is adding a route to my binding, here is an updated version of the code that works:
internal class Program
{
private const string WorkerExchange = "work.exchange";
private const string RetryExchange = "retry.exchange";
public const string WorkerQueue = "work.queue";
private const string RetryQueue = "retry.queue";
static void Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(WorkerExchange, "direct");
channel.QueueDeclare
(
WorkerQueue, true, false, false,
new Dictionary<string, object>
{
{"x-dead-letter-exchange", RetryExchange},
{"x-dead-letter-routing-key", RetryQueue}
}
);
channel.QueueBind(WorkerQueue, WorkerExchange, WorkerQueue, null);
channel.ExchangeDeclare(RetryExchange, "direct");
channel.QueueDeclare
(
RetryQueue, true, false, false,
new Dictionary<string, object>
{
{"x-dead-letter-exchange", WorkerExchange},
{"x-dead-letter-routing-key", WorkerQueue},
{"x-message-ttl", 30000},
}
);
channel.QueueBind(RetryQueue, RetryExchange, RetryQueue, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(1000);
Console.WriteLine("Rejected message");
channel.BasicNack(ea.DeliveryTag, false, false);
};
channel.BasicConsume(WorkerQueue, false, consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
The difference being that the call to channel.QueueBind(WorkerQueue, WorkerExchange, WorkerQueue, null);
now supplies the routing key to be the same as the queuename, so when the message "dead-letters" it gets routed to the exchange via this key
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With