Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the best way to have multiple threads doing work, and waiting for all of them to complete?

I'm writing a simple app (for my wife no less :-P ) that does some image manipulation (resizing, timestamping etc) for a potentially large batch of images. So I'm writing a library that can do this both synchronously and asynchronously. I decided to use the Event-based Asynchronous Pattern. When using this pattern, you need to raise an event when the work has been completed. This is where I'm having problems knowing when it's done. So basically, in my DownsizeAsync method (async method for downsizing images) I'm doing something like this:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

The tricky part now is knowing when they are all complete.

Here's what I've considered: Using ManualResetEvents like here: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx But the problem I came across is that you can only wait for 64 or less events. I may have many many more images.

Second option: Have a counter that counts the images that have been done, and raise the event when the count reaches the total:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Now this feels "hacky" and I'm not entirely sure if that's thread safe.

So, my question is, what's the best way of doing this? Is there another way to synchronize all threads? Should I not be using a ThreadPool? Thanks!!

UPDATE Based on feedback in the comments and from a few answers I've decided to take this approach:

First, I created an extension method that batches an enumerable into "batches":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

Basically, if you do something like this:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

You get this output:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

The idea being that (as someone in the comments pointed out) there's no need to create a separate thread for each image. Therefore, I'll batch the images into [machine.cores * 2] number of batches. Then, I'll use my second approach which is simply to keep a counter going and when the counter reaches the total I'm expecting, I'll know I'm done.

The reason I'm convinced now that it is in fact thread safe, is because I've marked the total variable as volatile which according to MSDN:

The volatile modifier is usually used for a field that is accessed by multiple threads without using the lock statement to serialize access. Using the volatile modifier ensures that one thread retrieves the most up-to-date value written by another thread

means I should be in the clear (if not, please let me know!!)

So here's the code I'm going with:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

I'm open to feedback as I'm in no way an expert on multithreading, so if anyone sees any issue with this, or has a better idea, please let me know. (Yes, this is just a home made app, but I have some ideas on how I can use the knowledge I gain here to improve our Search / Index service we use at work.) For now I'll keep this question open till I feel like I'm using the right approach. Thanks everyone for your help.

like image 232
BFree Avatar asked Dec 16 '09 15:12

BFree


People also ask

How does one ensure thread Synchronisation such that all threads wait until all threads arrive?

Barriers. Barriers are a synchronization mechanism that can be used to coordinate multiple threads working in parallel. A barrier allows each thread to wait until all cooperating threads have reached the same point, and then continue executing from there.

Where two or more threads wait for each other to execute?

A deadlock is what occurs when two or more threads are waiting for eachother to complete or to release a resource in such a way that they wait forever.


1 Answers

The simplest thing would be to create new threads, and then call Thread.Join on each of them. You could use a semaphore or something like it - but it's probably easier to just create new threads.

In .NET 4.0 you could use Parallel Extensions to do this quite easily with tasks.

As another alternative which would use the threadpool, you could create a delegate and call BeginInvoke on it, to return an IAsyncResult - you can then get the WaitHandle for each result via the AsyncWaitHandle property, and call WaitHandle.WaitAll.

EDIT: As pointed out in comments, you can only call WaitAll with up to 64 handles at a time on some implementations. Alternatives could be calling WaitOne on each of them in turn, or calling WaitAll with batches. It won't really matter, so long as you're doing it from a thread which isn't going to block the threadpool. Also note that you can't call WaitAll from an STA thread.

like image 61
Jon Skeet Avatar answered Nov 03 '22 01:11

Jon Skeet