Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Re-queue message on exception

I'm looking for a solid way of re-queuing messages that couldn't be handled properly - at this time.

I've been looking at http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/ and it seems that it's supported to requeue messages in the RabbitMQ API.

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

However I'm using EasyNetQ. So wondering how I would do something similar here.

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

Is this even a good approach? Not ACK the message could cause problems if do work exceeds the wait for ACK timeout by the RabbitMQ server.

like image 741
Snæbjørn Avatar asked Aug 18 '15 15:08

Snæbjørn


2 Answers

So I came up with this solution. Which replaces the default error strategy by EasyNetQ.

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}

You replace it like this:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())

You have to use the AdvancedBus so you have to setup everything up manually.

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}

This will dead letter a failed message 3 times. After that it'll go to the default error queue provided by EasyNetQ for error handling. You can subscribe to that queue.

A message is dead lettered when an exception propagates out of your consumer method. So this would trigger a dead letter.

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}
like image 177
Snæbjørn Avatar answered Nov 18 '22 22:11

Snæbjørn


to the best of my knowledge, there is no way to manually ack, nack or reject a message with EasyNetQ.

I see you have opened an issue ticket with the EasyNetQ team, regarding this... but no answer, yet.

FWIW, this is a very appropriate thing to do. All of the libraries that I use support this feature set (in NodeJS) and it is common. I'm surprised EasyNetQ doesn't support this.

like image 36
Derick Bailey Avatar answered Nov 18 '22 20:11

Derick Bailey