I have this in a class called "MessageQueueReceive".
public MessageQueueTransaction BlockingReceive(out Message message)
{
MessageQueueTransaction tran = null;
message = null;
tran = new MessageQueueTransaction();
tran.Begin();
try
{
message = Queue.Receive(new TimeSpan(0, 0, 5), tran);
}
catch (MessageQueueException ex)
{
// If the exception was a timeout, then just continue
// otherwise re-raise it.
if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
throw ex;
}
return tran;
}
Then my processing loop has this:-
while (!Abort)
{
try
{
tran = this.Queue.BlockingReceive(out msg);
if (msg != null)
{
// Process message here
if (tran != null)
tran.Commit();
}
}
catch (Exception ex)
{
if (tran != null)
tran.Abort();
}
}
The control panel tool shows that the message queues I'm using are transactional. Journal queue is not enabled.
This code creates the queue:-
private static MessageQueue CreateMessageQueue(string queueName, bool transactional = false)
{
MessageQueue messageQueue = MessageQueue.Create(queueName, transactional);
messageQueue.SetPermissions("Administrators", MessageQueueAccessRights.FullControl,
AccessControlEntryType.Allow);
return messageQueue;
}
The transactional parameter is set as "true" when this is called.
What I find is that when an exception occurs during the processing of the message, tran.Abort is called but at that point I'd expect the message to be returned to the queue. However, this is not happening and the messages are lost.
Am I missing something obvious? Can anyone see what I'm doing wrong?
Thanks for all the comments. I did re-organise my code as Russell McClure suggested, and I tried to create simple test cases but could not reproduce the problem.
In the end, the problem was not at all where I was looking (how often does that happen?).
In my pipeline, I had a duplicate message checker. The "messages" my system deals with are from remote devices on a WAN, and occasionally messages on the wire are duplicated.
When a message was pulled from the MSMQ, it would pass via the duplicate checker the database writer. If the database writer failed, the duplicate checked did not remove the hash from its table. When the process tried to loop again, it would get the same message from the queue agan because the MSMQ transaction had been rolled back when the database writer failed. However, on the second attempt, the duplicate checker would spot that it had seen the message before, and swallow it silently.
The fix was to make the duplicate checker spot the exception coming from the next link in the chain, and roll-back anything it had done too.
Your queue needs to be created as a transactional queue to get what you want.
EDIT:
Well, if your queue is transactional then that points to the fact that you are mishandling your transaction, although I can't see specifically how it is happening. I would change your BlockingReceive method to return the message. I would move the creation of the MessageQueueTransaction to the outer method. Your code will be much more maintainable if you have the Begin, Commit and Abort method calls in the same method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With