Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Masstransit consumer death - Peek()

Tags:

masstransit

I am curious as to if MassTransit consumers can Peek() a MSMQ queue before actually retrieving the msg.

What the steps/process is:

1) Msg sent to queue

2) Consumers gets it and has to do a DB update -- takes about 5 seconds

3) Consumer has to do second round of updates if the first worked.

My problem is, how can I handle the case that if the first DB update fails that the message stays in the queue (i.e. network problem and can't get to db).

Currently as soon as it reads the msg from the queue it removes it and then it just disappears if the DB updates fail..

Additionally, how can I handle a power failure -- I mean if half way through a 'job' by a consumer, whatever that is (db update or something else) and the power dies etc, how can I re-run the process on the msg in the queue? Lets say the job (in my current instance anyway) is pushing a new row to a table. I mean I can write the code to first check if the row is there and if it is then drop the message and if not then run the task, but how can I get it to re-run the whole process in the first place?

I have read up that I could Peek() the queue and then run the task and then read the queue msg for real and remove it, but I cannot for the life of me figure out if that works with mass transit... bit lost...

Additionally I am aware that Masstransit has .RetryLater but then do I use that in the process? Is it Initially --> When --> Then --> .RetryLater in the saga??

Any pointers would be appeciated

Kindest Regards Robin

EDIT

PS: I am using a saga....

Define(() =>
            {
                RemoveWhen(saga => saga.CurrentState == Completed);

                Initially(
                    When(NewAC)
                        .Then((saga, message) => saga.ProcessPSM(message),
                            InCaseOf<Exception>()
                               .TransitionTo(Problem)                                  
                                )
                        .Then((saga, message) => saga.PostProcessPSM())
                        .Complete()
                    );
                During(Problem,
                    When(Waiting)
                               // NOTE: THIS DOES NOT WORK!!!!
                        .RetryLater()
                    );
                });

The RetryLater throw an error of: "The message cannot be accepted by an existing saga"

I am not sure how else I can access the 'RetryLater".

like image 248
Robin Rieger Avatar asked Dec 21 '12 03:12

Robin Rieger


1 Answers

MassTransit abstracts the concepts of the underlying queues. So Peek is not the solution, but it does have other means for retrying messages. If you are just interested in handling error and failure conditions the following mechanisms are sufficient.

By default if a consumer throws an exception the message will be retried N times:

  • where N is configured on the bus and defaults to 5. It can be changed in the bus initilisation using SetDefaultRetryLimit on the ServiceBusConfigurator
  • Where retried means the message will be added to the end of the queue

If you would like a finer grained approach to error handling you can implement a Context Consumer, catch recoverable or transient exceptions and manually call RetryLater. There is no limit to how many times this can be done as I understand it.

public class RetryConsumer : Consumes<AwesomeMessage>.Context
{

    public void Consume(IConsumeContext<AwesomeMessage> message)
    {
        try
        {
            Console.WriteLine("This is Attempt " + message.RetryCount);
            // Do Something
        }
        catch (SomeTransientException e)
        {
            message.RetryLater();
        }
    }
}
like image 172
Adam Mills Avatar answered Oct 11 '22 20:10

Adam Mills