I've got about 50K messages I wish to add to an azure queue.
I'm not sure if the code I have is safe. It feels/smells bad.
Basically, give a collection of POCO's, serialize the POCO to some json, then add that json text to the queue.
public void AddMessage(T content)
{
content.ShouldNotBe(null);
var json = JsonConvert.SerializeObject(content);
var message = new CloudQueueMessage(json);
Queue.AddMessage(message);
}
public void AddMessages(ICollection<T> contents)
{
contents.ShouldNotBe(null);
Parallel.ForEach(contents, AddMessage);
}
Can someone tell me what I should be doing to fix this up -- and most importantly, why?
I feel that the Queue might not be thread safe, in this scenario.
A few things I have observed regarding Parallel.ForEach
and dealing with Azure Storage (my experience has been with uploading blobs/blocks in parallel):
Parallel.ForEach
is more suitable for processor intensive applications.Parallel.ForEach
is that we started to get a lot of Timeout
exceptions and actually slowed down the entire operation. I believe the reason for this is when you iterate over a collection with large number of items using this approach, you're essentially handling the control to underlying framework which decides how to deal with that collection. In this case, a lot of Context Switching
will take place which slows down the operation. Not sure how this would work in your scenario considering the payload is smaller.My recommendation would be have the application control the number of parallel threads it can spawn. A good criteria would be the number of logical processor. Another good criteria would be the number of ports IE can open. So you would spawn that many number of parallel threads. Then you could either wait for all threads to finish to spawn next set of parallel threads or start a new thread as soon as one task finishes.
Pseudo Code:
ICollection<string> messageContents;
private void AddMessages()
{
int maxParallelThreads = Math.Min(Environment.ProcessorCount, messageContents.Count);
if (maxParallelThreads > 0)
{
var itemsToAdd = messageContents.Take(maxParallelThreads);
List<Task> tasks = new List<Task>();
for (var i = 0; i < maxParallelThreads; i++)
{
tasks.Add(Task.Factory.StartNew(() =>
{
AddMessage(itemsToAdd[i]);
RemoveItemFromCollection();
}));
}
Task.WaitAll(tasks.ToArray());
AddMessages();
}
}
Your code looks fine to me at a high level. Gaurav's additions make sense, so you have more controls over the parallel processing of your requests. Make sure you add some form of retry logic, and perhaps setting the DefaultConnectionLimit to something greater than its default value (which is 2). You may also consider adding multiple Azure Queues across multiple storage accounts if you hit a form of throttling, depending on the type of errors you are getting.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With