Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to specify the wait time of retrying a message?

Tags:

rebus

Is there a way to specify the wait time of retrying a message for a particular exception?

E.g. If object is in SomethingInProgress status, throws an SomethignInProgressException and I want to the message to be retry after 40m. Or is it more appropriate to raise a SomethingInProgressEvent and use bus.defer?

like image 624
Yin Avatar asked Jan 11 '23 01:01

Yin


2 Answers

This is part of the reason why Rebus does not have the concept of second-level retries - I've simply not seen any way that this function could be created in a way that was generic and still flexible enough.

To answer your question shortly: No, there's no (built-in) way of varying the time between retries for a particular exception. In fact, there's no way to configure a wait time between retries at all - failing messages will be retried as fast as possibly, and then moved to the error queue if they keep failing to avoid "clogging up the pipes".

In your case, I suggest you do something like this:

public void Handle(MyMessage message) {
    var headers = MessageContext.GetCurrent().Headers;
    var deliveryAttempt = headers.ContainsKey("attempt_no") 
        ? Convert.ToInt(headers["attempt_no"]) 
        : 0;

    try {
        DoWhateverWithThe(message);
    } catch(OneKindOfException e) {
        if (deliveryAttempt > 5) {
            bus.Advanced.Routing.ForwardCurrentMessage("error");
            return;
        }

        bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
        bus.Defer(TimeSpan.FromSeconds(20), message);
    } catch(AnotherKindOfException e) {
        if (deliveryAttempt > 5) {
            bus.Advanced.Routing.ForwardCurrentMessage("error");
            return;
        }

        bus.AttachHeader(message, "attempt_no", deliveryAttempt + 1);
        bus.Defer(TimeSpan.FromMinutes(2), message);
    }
}

which I just wrote off the top of my head without being 100% certain that it actually compiles ... but the gist of it is that we track how many delivery attempts we've made in a custom header on the message, bus.Deferring the message an appropriate time span for each failed delivery attempt, immediately forwarding the message to the error queue when our max # of delivery attempts has been exceeded.

I hope that makes sense :)

like image 81
mookid8000 Avatar answered May 26 '23 13:05

mookid8000


A more recent example of how to do this is:

public async Task Handle(IFailed<MyMessage> message)
{
    var maxAttempts = 10;
    var optionalHeaders = new Dictionary<string, string>();
    if (message.Headers != null && message.Headers.ContainsKey("attemptNumber"))
    {
        // increment the attempt number
        var attemptNumber = int.Parse(message.Headers["attemptNumber"]);
        attemptNumber++;
        optionalHeaders.Add("attemptNumber", attemptNumber.ToString());
        if (attemptNumber > maxAttempts)
        {
            // log I give up message, message will move to dead queue
            return;
        }
    }
    else
        optionalHeaders.Add("attemptNumber", "1");

    // if message failed to process, defer processing for 5 minutes and try again
    await Bus.Defer(TimeSpan.FromMinutes(5), message.Message, optionalHeaders);
}
like image 23
Michael Brown Avatar answered May 26 '23 13:05

Michael Brown