I am curious as to if MassTransit consumers can Peek() a MSMQ queue before actually retrieving the msg.
What the steps/process is:
1) Msg sent to queue
2) Consumers gets it and has to do a DB update -- takes about 5 seconds
3) Consumer has to do second round of updates if the first worked.
My problem is, how can I handle the case that if the first DB update fails that the message stays in the queue (i.e. network problem and can't get to db).
Currently as soon as it reads the msg from the queue it removes it and then it just disappears if the DB updates fail..
Additionally, how can I handle a power failure -- I mean if half way through a 'job' by a consumer, whatever that is (db update or something else) and the power dies etc, how can I re-run the process on the msg in the queue? Lets say the job (in my current instance anyway) is pushing a new row to a table. I mean I can write the code to first check if the row is there and if it is then drop the message and if not then run the task, but how can I get it to re-run the whole process in the first place?
I have read up that I could Peek()
the queue and then run the task and then read the queue msg for real and remove it, but I cannot for the life of me figure out if that works with mass transit... bit lost...
Additionally I am aware that Masstransit has .RetryLater
but then do I use that in the process? Is it Initially
--> When
--> Then
--> .RetryLater
in the saga??
Any pointers would be appeciated
Kindest Regards Robin
EDIT
PS: I am using a saga....
Define(() =>
{
RemoveWhen(saga => saga.CurrentState == Completed);
Initially(
When(NewAC)
.Then((saga, message) => saga.ProcessPSM(message),
InCaseOf<Exception>()
.TransitionTo(Problem)
)
.Then((saga, message) => saga.PostProcessPSM())
.Complete()
);
During(Problem,
When(Waiting)
// NOTE: THIS DOES NOT WORK!!!!
.RetryLater()
);
});
The RetryLater throw an error of: "The message cannot be accepted by an existing saga"
I am not sure how else I can access the 'RetryLater".
MassTransit abstracts the concepts of the underlying queues. So Peek is not the solution, but it does have other means for retrying messages. If you are just interested in handling error and failure conditions the following mechanisms are sufficient.
By default if a consumer throws an exception the message will be retried N times:
If you would like a finer grained approach to error handling you can implement a Context Consumer, catch recoverable or transient exceptions and manually call RetryLater. There is no limit to how many times this can be done as I understand it.
public class RetryConsumer : Consumes<AwesomeMessage>.Context
{
public void Consume(IConsumeContext<AwesomeMessage> message)
{
try
{
Console.WriteLine("This is Attempt " + message.RetryCount);
// Do Something
}
catch (SomeTransientException e)
{
message.RetryLater();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With