Just 20 message per second! That is all I got! Here is the code that peeks 50 message from queue and receives them in parallel using ReceiveById. Total number of messages in queue is 500. I have tested other numbers too. But the top limit is 20 message per second! Am I somewhere totally out of the way?
Edit 1:
1 - I need the queue to be recoverable. BUT the interesting part is even if I set the recoverable option to false; still the top limit is 20 message/sec.
2 - I am forced to use MSMQ here because there are some legacy apps involved. But if this code is correct and this top 20 limit really exists I can persuade the group to switch. So any recommendation (based on actual experience) for replacing MSMQ is really welcome (And please note that we need to persist our messages in case of any failure of any kind).
3 - I have set the number of threads in ThreadPool to a high number in case it helps but actually in this code it will cause 100 - 200 threads be created. I have tested different number from 50 to 10000 and no difference made.
4 - In each task a new MessageQueue created because ReceiveById is not thread safe.
5 - As one can see in the code the message size is very low; it's just a string plus an int.
Edit 2: [Very Strange New Result]
I've played with every bit of this code and found this: if I comment out the line singleLocal.UseJournalQueue = false; in my task, I can read up to 1200 messages per second. Not impressive but acceptable in my case. The strange part is the default value of UseJournalQueue is false; why setting it to false again should make such difference in performance?
static partial class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = false;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
while (Interlocked.Read(ref __counter) < count)
{
var list = new List<Message>();
var peekCount = 50;
while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10)))
{
try
{
list.Add(enu.Current);
peekCount--;
}
catch (Exception ex2)
{
Trace.WriteLine(ex2.ToString());
break;
}
}
var tlist = new List<Task>();
foreach (var message in list)
{
var stupid_closure = message;
var t = new Task(() =>
{
using (var singleLocal = new MessageQueue(qName))
{
singleLocal.UseJournalQueue = false;
singleLocal.DefaultPropertiesToSend.AttachSenderId = false;
singleLocal.DefaultPropertiesToSend.Recoverable = true;
singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
try
{
// processing the message and insert it into database
// workflow completed here, so we can safely remove the message from queue
var localM = singleLocal.ReceiveById(stupid_closure.Id);
var localSample = (Data)localM.Body;
Interlocked.Increment(ref __counter);
Console.WriteLine(Interlocked.Read(ref __counter));
}
catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); }
catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); }
}
}, TaskCreationOptions.PreferFairness);
tlist.Add(t);
}
foreach (var t in tlist) t.Start();
Task.WaitAll(tlist.ToArray());
list.Clear();
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
}
static long __counter = 0;
}
Kaveh, The constructor of MessageQueue object that you are using sets the UseJournalQueue property to true in case the Message Queuing object's journal setting is enabled. Somehow it is thinking that .\private$\deep_den's journal setting is enabled. EDIT - Are you using a pre-created queue?
When benchmarking its important to keep the code to a bare minimum in order to avoid background noise interfering with the test.
unfortunately your test is so noisy that it is quite hard to find what exactly is causing the delay
I have rewritten the test and receive much better results MSMQ is not the fastest queue on the block, but it is not slow.
var qName = @".\private$\deep_den";
if (!MessageQueue.Exists(qName))
{
var q = MessageQueue.Create(qName);
}
var single = new MessageQueue(qName);
single.UseJournalQueue = true;
single.DefaultPropertiesToSend.AttachSenderId = false;
single.DefaultPropertiesToSend.Recoverable = true;
single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
var count = 500;
var watch = new Stopwatch();
watch.Start();
for (int i = 0; i < count; i++)
{
var data = new Data { Name = string.Format("name_{0}", i), Value = i };
single.Send(new Message(data));
}
watch.Stop();
Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds);
var enu = single.GetMessageEnumerator2();
watch.Reset();
watch.Start();
var queue = new MessageQueue(qName);
queue.UseJournalQueue = true;
queue.DefaultPropertiesToSend.AttachSenderId = false;
queue.DefaultPropertiesToSend.Recoverable = true;
queue.Formatter = new XmlMessageFormatter(new[] { typeof(Data) });
List<Data> lst = new List<Data>();
while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1)))
{
var message = queue.ReceiveById(enu.Current.Id);
lst.Add((Data)message.Body);
}
watch.Stop();
Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count);
Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds);
Console.WriteLine("press any key to continue ...");
Console.ReadKey();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With