Looking for some sample code (C#) for a simple thread pool implementation.
I found one on codeproject, but the codebase was just huge and I don't need all that functionality.
This is more for educational purposes anyways.
A, good, thread pool keeps a set number of threads running and waiting to do something. The pool could be designed to scale up with the amount of work you need to do but I prefer specifying a fixed number of threads. A good way to choose this number is to use the number of cores/processors on the system + 1.
In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program.
This is the simplest, naive, thread-pool implementation for educational purposes I could come up with (C# / .NET 3.5). It is not using the .NET's thread pool implementation in any way.
using System; using System.Collections.Generic; using System.Threading; namespace SimpleThreadPool { public sealed class Pool : IDisposable { public Pool(int size) { this._workers = new LinkedList<Thread>(); for (var i = 0; i < size; ++i) { var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) }; worker.Start(); this._workers.AddLast(worker); } } public void Dispose() { var waitForThreads = false; lock (this._tasks) { if (!this._disposed) { GC.SuppressFinalize(this); this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks while (this._tasks.Count > 0) { Monitor.Wait(this._tasks); } this._disposed = true; Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them) waitForThreads = true; } } if (waitForThreads) { foreach (var worker in this._workers) { worker.Join(); } } } public void QueueTask(Action task) { lock (this._tasks) { if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); } if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); } this._tasks.AddLast(task); Monitor.PulseAll(this._tasks); // pulse because tasks count changed } } private void Worker() { Action task = null; while (true) // loop until threadpool is disposed { lock (this._tasks) // finding a task needs to be atomic { while (true) // wait for our turn in _workers queue and an available task { if (this._disposed) { return; } if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available { task = this._tasks.First.Value; this._tasks.RemoveFirst(); this._workers.RemoveFirst(); Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task) break; // we found a task to process, break out from the above 'while (true)' loop } Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process } } task(); // process the found task lock(this._tasks) { this._workers.AddLast(Thread.CurrentThread); } task = null; } } private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending private bool _disposed; // set to true when disposing queue and no more tasks are pending } public static class Program { static void Main() { using (var pool = new Pool(5)) { var random = new Random(); Action<int> randomizer = (index => { Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index); Thread.Sleep(random.Next(20, 400)); Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index); }); for (var i = 0; i < 40; ++i) { var i1 = i; pool.QueueTask(() => randomizer(i1)); } } } } }
There is no need to implement your own, since it is not very hard to use the existing .NET implementation.
From ThreadPool Documentation:
using System; using System.Threading; public class Fibonacci { public Fibonacci(int n, ManualResetEvent doneEvent) { _n = n; _doneEvent = doneEvent; } // Wrapper method for use with thread pool. public void ThreadPoolCallback(Object threadContext) { int threadIndex = (int)threadContext; Console.WriteLine("thread {0} started...", threadIndex); _fibOfN = Calculate(_n); Console.WriteLine("thread {0} result calculated...", threadIndex); _doneEvent.Set(); } // Recursive method that calculates the Nth Fibonacci number. public int Calculate(int n) { if (n <= 1) { return n; } return Calculate(n - 1) + Calculate(n - 2); } public int N { get { return _n; } } private int _n; public int FibOfN { get { return _fibOfN; } } private int _fibOfN; private ManualResetEvent _doneEvent; } public class ThreadPoolExample { static void Main() { const int FibonacciCalculations = 10; // One event is used for each Fibonacci object ManualResetEvent[] doneEvents = new ManualResetEvent[FibonacciCalculations]; Fibonacci[] fibArray = new Fibonacci[FibonacciCalculations]; Random r = new Random(); // Configure and launch threads using ThreadPool: Console.WriteLine("launching {0} tasks...", FibonacciCalculations); for (int i = 0; i < FibonacciCalculations; i++) { doneEvents[i] = new ManualResetEvent(false); Fibonacci f = new Fibonacci(r.Next(20,40), doneEvents[i]); fibArray[i] = f; ThreadPool.QueueUserWorkItem(f.ThreadPoolCallback, i); } // Wait for all threads in pool to calculation... WaitHandle.WaitAll(doneEvents); Console.WriteLine("All calculations are complete."); // Display the results... for (int i= 0; i<FibonacciCalculations; i++) { Fibonacci f = fibArray[i]; Console.WriteLine("Fibonacci({0}) = {1}", f.N, f.FibOfN); } } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With