Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel.ForEach iterating items in collection multiple times

I have a Parallel.ForEach running inside a Task. It iterates over a collection of email addresses and sends a MailMessage to the SMTP queue, once it's sent it updates a table in the DB with a result.

I can see in the DB that it's sending the MailMessage to the queue multiple times, sometimes up to 6 times. Here is my simplified code, can anyone recommend a better approach?

On button click, I create a new Task...

CampaignManager.Broadcast.BroadcastService broadcastService = new CampaignManager.Broadcast.BroadcastService();

        var task = Task<CampaignManager.Broadcast.Results.Broadcast>.Factory.StartNew(() => { 
            return broadcastService.BroadcastCampaign();
        }, TaskCreationOptions.LongRunning);

        Task.WaitAny(task);

        if (task.Result != null)
        {
            Broadcast.Results.Broadcast broadcastResult = task.Result;
            MessageBox.Show(broadcastResult.BroadcastSent.GroupName + " completed. " + broadcastResult.NumberSuccessful + " sent.");
        }

This creates a task, which basically gets a ConcurrentBag of subscribers (custom class), iterates over the collection and sends a message...

public Results.Broadcast BroadcastCampaign()
{
// Get ConcurrentBag of subscribers
subscribers = broadcast.GetSubscribers();

// Iterate through subscribers and send them a message
Parallel.ForEach(subscribers, subscriber =>
{
    // do some work, send to SMTP queue

    // Add to DB log
});

// return result
}

I am led to believe that ConcurrentBag's are thread-safe, so I'm not sure why it would be iterating over some in the collection multiple times. Out of a thousand it would queue at least 2 messages for 10% of the collection.

Thanks,

Greg.

like image 550
gfyans Avatar asked Aug 15 '11 20:08

gfyans


1 Answers

I am led to believe that ConcurrentBag's are thread-safe, so I'm not sure why it would be iterating over some in the collection multiple times.

Your assumption here is true. In fact, ConcurrentBag<T>'s GetEnumerator<T> method (for enumerating the collection) actually makes an entire copy of the internal collection at that point, so you're iterating over a copy of the collection.

If you're seeing the queue being called multiple times for a single subscriber, it means that you added that subscriber to the ConcurrentBag<T> multiple times, or there is some other issue going on...


On a separate note, the use of a Task here is really not necessary. It only adds overhead (creates a dedicated thread in this case, then immediately blocks and waits on it). It would be much better to just rewrite this to call your method, as so:

CampaignManager.Broadcast.BroadcastService broadcastService = new CampaignManager.Broadcast.BroadcastService();

Broadcast.Results.Broadcast broadcastResult = broadcastService.BroadcastCampaign();
MessageBox.Show(broadcastResult.BroadcastSent.GroupName + " completed. " + broadcastResult.NumberSuccessful + " sent.");

Creating a task just to wait on it immediately (Task.WaitAny) is not helpful at all. In addition, instead of using Task.WaitAny(...), if you want the task for some other purpose, you can just call broadcastResult = task.Result;, as that will block until the task completes.

like image 186
Reed Copsey Avatar answered Nov 19 '22 10:11

Reed Copsey