I have a subroutine that processes large blocks of information. In order to make use of the entire CPU, it divides the work up into separate threads. After all threads have completed, it finishes. I read that creating and destroying threads uses lots of overhead, so I tried using the threadpool, but that actually runs slower than creating my own threads. How can I create my own threads when the program runs and then keep reusing them? I've seen some people say it can't be done, but the threadpool does it so it must be possible, right?
Here is part of the code that launches new threads / uses the threadpool:
//initialization for threads
Thread[] AltThread = null;
if (NumThreads > 1)
AltThread = new Thread[pub.NumThreads - 1];
do
{
if (NumThreads > 1)
{ //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads
int ThreadWidth = DataWidth / NumThreads;
if (UseThreadPool) //use threadpool threads
{
for (int i = 0; i < NumThreads - 1; i++)
{
ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread,
new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
}
//get number of threads available after queue
System.Threading.Thread.Sleep(0);
int StartThreads, empty, EndThreads;
ThreadPool.GetAvailableThreads(out StartThreads, out empty);
ComputePartialData(ThisEngine, 0, ThreadWidth);
//wait for all threads to finish
do
{
ThreadPool.GetAvailableThreads(out EndThreads, out empty);
System.Threading.Thread.Sleep(1);
} while (StartThreads - EndThreads > 0);
}
else //create new threads each time (can we reuse these?)
{
for (int i = 0; i < NumThreads - 1; i++)
{
AltThread[i] = new Thread(ComputePartialDataOnThread);
AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
}
ComputePartialData(ThisEngine, 0, ThreadWidth);
//wait for all threads to finish
foreach (Thread t in AltThread)
t.Join(1000);
foreach (Thread t in AltThread)
if (t.IsAlive) t.Abort();
}
}
}
ComputePartialDataOnThread simply unpackages the information and calls ComputePartialData. The data that will be processed is shared among the threads (they don't try to read/write the same locations). AltEngine[] is a separate computation engine for each thread.
The operation runs about 10-20% using the threadpool.
This sounds like a fairly common requirement which can be solved by a multi-threaded producer-consumer queue. The threads are kept 'alive' and are signaled to do work when new work is added to the queue. The work is represented by a delegate (in your case ComputePartialDataOnThread) and the data passed to the delegate is what is queued (in your case the params to ComputePartialDataOnThread). The useful feature is that the implementation of managing worker threads and the actual algorithms are separate. Here is the p-c queue:
public class SuperQueue<T> : IDisposable where T : class
{
readonly object _locker = new object();
readonly List<Thread> _workers;
readonly Queue<T> _taskQueue = new Queue<T>();
readonly Action<T> _dequeueAction;
/// <summary>
/// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
/// </summary>
/// <param name="workerCount">The worker count.</param>
/// <param name="dequeueAction">The dequeue action.</param>
public SuperQueue(int workerCount, Action<T> dequeueAction)
{
_dequeueAction = dequeueAction;
_workers = new List<Thread>(workerCount);
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
{
Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
_workers.Add(t);
t.Start();
}
}
/// <summary>
/// Enqueues the task.
/// </summary>
/// <param name="task">The task.</param>
public void EnqueueTask(T task)
{
lock (_locker)
{
_taskQueue.Enqueue(task);
Monitor.PulseAll(_locker);
}
}
/// <summary>
/// Consumes this instance.
/// </summary>
void Consume()
{
while (true)
{
T item;
lock (_locker)
{
while (_taskQueue.Count == 0) Monitor.Wait(_locker);
item = _taskQueue.Dequeue();
}
if (item == null) return;
// run actual method
_dequeueAction(item);
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
_workers.ForEach(thread => EnqueueTask(null));
_workers.ForEach(thread => thread.Join());
}
}
As previous posters have said, there are many built in structures (look at TPL ), which use the Threadpool, which you may want to look at before implementing your own queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With