Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to batch AddMessage to an Azure Queue

I've got about 50K messages I wish to add to an azure queue.

I'm not sure if the code I have is safe. It feels/smells bad.

Basically, give a collection of POCO's, serialize the POCO to some json, then add that json text to the queue.

public void AddMessage(T content)
{
    content.ShouldNotBe(null);

    var json = JsonConvert.SerializeObject(content);
    var message = new CloudQueueMessage(json);
    Queue.AddMessage(message);
}

public void AddMessages(ICollection<T> contents)
{
    contents.ShouldNotBe(null);
    Parallel.ForEach(contents, AddMessage);
}

Can someone tell me what I should be doing to fix this up -- and most importantly, why?

I feel that the Queue might not be thread safe, in this scenario.

like image 784
Pure.Krome Avatar asked Mar 20 '23 23:03

Pure.Krome


2 Answers

A few things I have observed regarding Parallel.ForEach and dealing with Azure Storage (my experience has been with uploading blobs/blocks in parallel):

  • Azure storage operations are Network (IO) based operations and not processor intensive operations. If I am not mistaken, Parallel.ForEach is more suitable for processor intensive applications.
  • Another thing we noticed with uploading a large number of blobs (or blocks) using Parallel.ForEach is that we started to get a lot of Timeout exceptions and actually slowed down the entire operation. I believe the reason for this is when you iterate over a collection with large number of items using this approach, you're essentially handling the control to underlying framework which decides how to deal with that collection. In this case, a lot of Context Switching will take place which slows down the operation. Not sure how this would work in your scenario considering the payload is smaller.

My recommendation would be have the application control the number of parallel threads it can spawn. A good criteria would be the number of logical processor. Another good criteria would be the number of ports IE can open. So you would spawn that many number of parallel threads. Then you could either wait for all threads to finish to spawn next set of parallel threads or start a new thread as soon as one task finishes.

Pseudo Code:

    ICollection<string> messageContents;
    private void AddMessages()
    {
        int maxParallelThreads = Math.Min(Environment.ProcessorCount, messageContents.Count);
        if (maxParallelThreads > 0)
        {
            var itemsToAdd = messageContents.Take(maxParallelThreads);
            List<Task> tasks = new List<Task>();
            for (var i = 0; i < maxParallelThreads; i++)
            {
                tasks.Add(Task.Factory.StartNew(() =>
                {
                    AddMessage(itemsToAdd[i]);
                    RemoveItemFromCollection();
                }));
            }
            Task.WaitAll(tasks.ToArray());
            AddMessages();
        }
    }
like image 160
Gaurav Mantri Avatar answered Apr 03 '23 06:04

Gaurav Mantri


Your code looks fine to me at a high level. Gaurav's additions make sense, so you have more controls over the parallel processing of your requests. Make sure you add some form of retry logic, and perhaps setting the DefaultConnectionLimit to something greater than its default value (which is 2). You may also consider adding multiple Azure Queues across multiple storage accounts if you hit a form of throttling, depending on the type of errors you are getting.

like image 20
Herve Roggero Avatar answered Apr 03 '23 06:04

Herve Roggero