I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.
My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.
In pseudo:
foreach( var o in collection ) { queue new worker(o); } while( workers not completed ) { continue; }
If necessary, I will know the number of workers that are about to be queued before hand.
Thread pools do not make sense when you need thread which perform entirely dissimilar and unrelated actions, which cannot be considered "jobs"; e.g., One thread for GUI event handling, another for backend processing. Thread pools also don't make sense when processing forms a pipeline.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.
Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.
public static void SpawnAndWait(IEnumerable<Action> actions) { var list = actions.ToList(); var handles = new ManualResetEvent[actions.Count()]; for (var i = 0; i < list.Count; i++) { handles[i] = new ManualResetEvent(false); var currentAction = list[i]; var currentHandle = handles[i]; Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } }; ThreadPool.QueueUserWorkItem(x => wrappedAction()); } WaitHandle.WaitAll(handles); }
Here's a different approach - encapsulation; so your code could be as simple as:
Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join();
Where the Forker
class is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):
using System; using System.Threading; /// <summary>Event arguments representing the completion of a parallel action.</summary> public class ParallelEventArgs : EventArgs { private readonly object state; private readonly Exception exception; internal ParallelEventArgs(object state, Exception exception) { this.state = state; this.exception = exception; } /// <summary>The opaque state object that identifies the action (null otherwise).</summary> public object State { get { return state; } } /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary> public Exception Exception { get { return exception; } } } /// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> public sealed class Forker { int running; private readonly object joinLock = new object(), eventLock = new object(); /// <summary>Raised when all operations have completed.</summary> public event EventHandler AllComplete { add { lock (eventLock) { allComplete += value; } } remove { lock (eventLock) { allComplete -= value; } } } private EventHandler allComplete; /// <summary>Raised when each operation completes.</summary> public event EventHandler<ParallelEventArgs> ItemComplete { add { lock (eventLock) { itemComplete += value; } } remove { lock (eventLock) { itemComplete -= value; } } } private EventHandler<ParallelEventArgs> itemComplete; private void OnItemComplete(object state, Exception exception) { EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); if (Interlocked.Decrement(ref running) == 0) { EventHandler allHandler = allComplete; // don't need to lock if (allHandler != null) allHandler(this, EventArgs.Empty); lock (joinLock) { Monitor.PulseAll(joinLock); } } } /// <summary>Adds a callback to invoke when each operation completes.</summary> /// <returns>Current instance (for fluent API).</returns> public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler) { if (handler == null) throw new ArgumentNullException("handler"); ItemComplete += handler; return this; } /// <summary>Adds a callback to invoke when all operations are complete.</summary> /// <returns>Current instance (for fluent API).</returns> public Forker OnAllComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); AllComplete += handler; return this; } /// <summary>Waits for all operations to complete.</summary> public void Join() { Join(-1); } /// <summary>Waits (with timeout) for all operations to complete.</summary> /// <returns>Whether all operations had completed before the timeout.</returns> public bool Join(int millisecondsTimeout) { lock (joinLock) { if (CountRunning() == 0) return true; Thread.SpinWait(1); // try our luck... return (CountRunning() == 0) || Monitor.Wait(joinLock, millisecondsTimeout); } } /// <summary>Indicates the number of incomplete operations.</summary> /// <returns>The number of incomplete operations.</returns> public int CountRunning() { return Interlocked.CompareExchange(ref running, 0, 0); } /// <summary>Enqueues an operation.</summary> /// <param name="action">The operation to perform.</param> /// <returns>The current instance (for fluent API).</returns> public Forker Fork(ThreadStart action) { return Fork(action, null); } /// <summary>Enqueues an operation.</summary> /// <param name="action">The operation to perform.</param> /// <param name="state">An opaque object, allowing the caller to identify operations.</param> /// <returns>The current instance (for fluent API).</returns> public Forker Fork(ThreadStart action, object state) { if (action == null) throw new ArgumentNullException("action"); Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { Exception exception = null; try { action(); } catch (Exception ex) { exception = ex;} OnItemComplete(state, exception); }); return this; } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With