Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper use of Message Queue on 2008R2

I am not a programmer but I am trying to help them out by giving them some guidance. We no longer have any in house expertise on msmq. We are trying to use this to integrate some functions with a scheduling application.

The scheduling app fires off a job by making a webcall using a custom built dll. The dll calls the weburl. The web app will run its task and send updates to a website about the task it performed. Website writes the message to the queue. The dll which called the site is monitoring the queue for messages with the label that was assigned to that job. When it receives the final status message it closes.

We are getting the following message every few hours. We run close to 100 jobs per hour that use this method. In the code listed at the bottom, the jobid corresponds to the label for the message in the message queue. Each job is issued a jobid at the start and will use that as the label for each message it sends to the msmq for that job.

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

Here is the code for it.

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";
like image 971
ddjammin Avatar asked Mar 23 '11 21:03

ddjammin


People also ask

What are the tasks of a message queue?

A message queue provides a lightweight buffer which temporarily stores messages, and endpoints that allow software components to connect to the queue in order to send and receive messages. The messages are usually small, and can be things like requests, replies, error messages, or just plain information.

How do I enable Message Queuing?

In Server Manager, click Features. In the right-hand pane under Features Summary, click Add Features. In the resulting window, expand Message Queuing. Expand Message Queuing Services.

What happens when message queue is full?

Multiple tasks can send messages to a message queue; a sending task can be blocked when the target message queue is full. A message queue can also have multiple tasks receiving messages from it; a receiving task can be blocked when the queue is empty.

How do I set up message queue in Windows?

Creating a Message Queue In the Computer Management window, expand the Services and Applications node from the left-side panel. The Message Queuing will be available there. To create a new queue, right click on the desired queue folder (Public Queues or Private Queues) and select New > Public/Private Queue.


1 Answers

kprobst's explanation is likely what is happening. Even if you are seeing this particular message is in the queue, if a different application(or different instance of the same application) picks a(any) message from this queue, that will invalidate the cursor.

Inherently this code is not designed to work if multiple processes are feeding off the same queue.

like image 123
Naraen Avatar answered Oct 18 '22 23:10

Naraen