Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Wait for pooled threads to complete

I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.

My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.

In pseudo:

foreach( var o in collection ) {   queue new worker(o); }  while( workers not completed ) { continue; } 

If necessary, I will know the number of workers that are about to be queued before hand.

like image 550
Jesse Hallam Avatar asked Feb 12 '09 04:02

Jesse Hallam


People also ask

When should you not use thread pool?

Thread pools do not make sense when you need thread which perform entirely dissimilar and unrelated actions, which cannot be considered "jobs"; e.g., One thread for GUI event handling, another for backend processing. Thread pools also don't make sense when processing forms a pipeline.

What happens to thread pool after it finishes its task?

Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.


2 Answers

Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.

public static void SpawnAndWait(IEnumerable<Action> actions) {     var list = actions.ToList();     var handles = new ManualResetEvent[actions.Count()];     for (var i = 0; i < list.Count; i++)     {         handles[i] = new ManualResetEvent(false);         var currentAction = list[i];         var currentHandle = handles[i];         Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };         ThreadPool.QueueUserWorkItem(x => wrappedAction());     }      WaitHandle.WaitAll(handles); } 
like image 58
JaredPar Avatar answered Sep 21 '22 00:09

JaredPar


Here's a different approach - encapsulation; so your code could be as simple as:

    Forker p = new Forker();     foreach (var obj in collection)     {         var tmp = obj;         p.Fork(delegate { DoSomeWork(tmp); });     }     p.Join(); 

Where the Forker class is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):

using System; using System.Threading;  /// <summary>Event arguments representing the completion of a parallel action.</summary> public class ParallelEventArgs : EventArgs {     private readonly object state;     private readonly Exception exception;     internal ParallelEventArgs(object state, Exception exception)     {         this.state = state;         this.exception = exception;     }      /// <summary>The opaque state object that identifies the action (null otherwise).</summary>     public object State { get { return state; } }      /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>     public Exception Exception { get { return exception; } } }  /// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> public sealed class Forker {     int running;     private readonly object joinLock = new object(), eventLock = new object();      /// <summary>Raised when all operations have completed.</summary>     public event EventHandler AllComplete     {         add { lock (eventLock) { allComplete += value; } }         remove { lock (eventLock) { allComplete -= value; } }     }     private EventHandler allComplete;     /// <summary>Raised when each operation completes.</summary>     public event EventHandler<ParallelEventArgs> ItemComplete     {         add { lock (eventLock) { itemComplete += value; } }         remove { lock (eventLock) { itemComplete -= value; } }     }     private EventHandler<ParallelEventArgs> itemComplete;      private void OnItemComplete(object state, Exception exception)     {         EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock         if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));         if (Interlocked.Decrement(ref running) == 0)         {             EventHandler allHandler = allComplete; // don't need to lock             if (allHandler != null) allHandler(this, EventArgs.Empty);             lock (joinLock)             {                 Monitor.PulseAll(joinLock);             }         }     }      /// <summary>Adds a callback to invoke when each operation completes.</summary>     /// <returns>Current instance (for fluent API).</returns>     public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)     {         if (handler == null) throw new ArgumentNullException("handler");         ItemComplete += handler;         return this;     }      /// <summary>Adds a callback to invoke when all operations are complete.</summary>     /// <returns>Current instance (for fluent API).</returns>     public Forker OnAllComplete(EventHandler handler)     {         if (handler == null) throw new ArgumentNullException("handler");         AllComplete += handler;         return this;     }      /// <summary>Waits for all operations to complete.</summary>     public void Join()     {         Join(-1);     }      /// <summary>Waits (with timeout) for all operations to complete.</summary>     /// <returns>Whether all operations had completed before the timeout.</returns>     public bool Join(int millisecondsTimeout)     {         lock (joinLock)         {             if (CountRunning() == 0) return true;             Thread.SpinWait(1); // try our luck...             return (CountRunning() == 0) ||                 Monitor.Wait(joinLock, millisecondsTimeout);         }     }      /// <summary>Indicates the number of incomplete operations.</summary>     /// <returns>The number of incomplete operations.</returns>     public int CountRunning()     {         return Interlocked.CompareExchange(ref running, 0, 0);     }      /// <summary>Enqueues an operation.</summary>     /// <param name="action">The operation to perform.</param>     /// <returns>The current instance (for fluent API).</returns>     public Forker Fork(ThreadStart action) { return Fork(action, null); }      /// <summary>Enqueues an operation.</summary>     /// <param name="action">The operation to perform.</param>     /// <param name="state">An opaque object, allowing the caller to identify operations.</param>     /// <returns>The current instance (for fluent API).</returns>     public Forker Fork(ThreadStart action, object state)     {         if (action == null) throw new ArgumentNullException("action");         Interlocked.Increment(ref running);         ThreadPool.QueueUserWorkItem(delegate         {             Exception exception = null;             try { action(); }             catch (Exception ex) { exception = ex;}             OnItemComplete(state, exception);         });         return this;     } } 
like image 28
Marc Gravell Avatar answered Sep 22 '22 00:09

Marc Gravell