I have an array of threads which are retrieving messages from a private message queue, deserializing them into Log Entry Objects and storing the log entry object's properties in an SQL Server Database Table logentry.
Here is my code for creating and starting the threads.
try
{
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(new ThreadStart(this.logEntriesToDatabase));
threads[i].Start();
}
}
catch (ThreadStateException ex)
{
MessageBox.Show(ex.Message, "Error", MessageBoxButtons.OK,MessageBoxIcon.Error);
return;
}
catch (OutOfMemoryException ex)
{
MessageBox.Show("Not Enough Memory Please Close Other Applications To Continue", "Error", MessageBoxButtons.OK, MessageBoxIcon.Error);
return;
}
Each Thread executes a function logentriestodatabase()
while(true)
{
#region Retrieves Message from Message Queue and Deserialized it to a Log Entry Object.
#region Sleep Time for Current Thread
Thread.Sleep(180);
#endregion
#region Check to See Whether Queue Is Empty. If so go back to start of while loop
if (q1.GetAllMessages().Length == 0)
{
continue;
}
#endregion
#region Message retrieval and Deserialization Code
System.Messaging.Message m = this.q1.Receive();
m.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
LogEntry lg = BinaryLogFormatter.Deserialize(m.Body.ToString());
#endregion
#endregion
#region Insert Log Entry Into Database
#region Define a new SQL Connection with username and password specified in App.Config, an SQL Transaction and database queuries
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["LogReader"].ConnectionString);
SqlTransaction transaction;
string query_insert_into_logentry = "INSERT INTO logentry" + "(message, priority, processname, severity, accountid, ipaddress, servername, servertype, timestamp)" + "VALUES ('" + lg.Message + "'," + lg.Priority + ",'" + lg.AppDomainName + "','" + lg.Severity.ToString() + "','" + lg.ExtendedProperties["AccountID"].ToString() + "','" + lg.ExtendedProperties["IpAddress"].ToString() + "','" + lg.ExtendedProperties["ServerName"].ToString() + "','" + lg.ExtendedProperties["ServerType"].ToString() + "','" + lg.TimeStamp.ToString() + "')";
string query_insert_into_category = "INSERT INTO category (category) VALUES ('" + lg.Categories.First().ToString() + "')";
#endregion
#region Begin and Terminates Transaction and Closes the SQL Connection Catches any SQL Exception Thrown and Displays Them
try
{
conn.Open();
transaction = conn.BeginTransaction();
new SqlCommand(query_insert_into_logentry, conn, transaction).ExecuteNonQuery();
new SqlCommand(query_insert_into_category, conn, transaction).ExecuteNonQuery();
transaction.Commit();
conn.Close();
}
catch (SqlException ex)
{
MessageBox.Show(ex.Message);
return;
}
#endregion
#endregion
}
Now whenever I run this program the moment the message queue goes empty the program hangs. I can't seem to figure out why. I tried to give a TimeSpan to q1.Receive() function but that did not work. I have called the sleep method with a time of 180 ms but still it does not work. Maybe its due to the fact that the q1.Receive method sends the current thread into blocking state when it encounters an empty queue.
Please help I am open to ideas.
Instead of synchronously reading messages in a tight loop and blocking multiple threads, you can read messages asynchronously using MessageQueue.BeginReceive/EndReceive. A similar question was asked here.
If you are using .NET 4.0 or later, you can create a Task from the BeginReceive/EndReceive pair and process the message using ContinueWith, without creating a new thread. In .NET 4.5 you can use the asyc/await keywords to make processing even simpler, eg:
private async Task<Message> MyReceiveAsync()
{
MessageQueue queue=new MessageQueue();
...
var message=await Task.Factory.FromAsync<Message>(
queue.BeginReceive(),
queue.EndReceive);
return message;
}
public async Task LogToDB()
{
while(true)
{
var message=await MyReceiveAsync();
SaveMessage(message);
}
}
Even though LogToDB uses `while(true), the loop will execute asynchronously.
To end the loop, you can pass a CancellationToken to LogToDB and end processing cooperatively:
public async Task LogToDB(CancellationToken token)
{
while(!token.IsCancellationRequested)
{
var message=await MyReceiveAsync();
SaveMessage(message);
}
}
This way you avoid creating multiple threads and timers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With