Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Query on simple C++ threadpool implementation

Stackoverflow has been a tremendous help to me and I'd to give something back to the community. I have been implementing a simple threadpool using the TinyThread++ website C++ portable thread library, using what I have learnt from Stackoverflow. I am new to thread programming, so not that comfortable with mutexes, etc. I have a question best asked after presenting the code (which runs quite well under Linux):

// ThreadPool.h

class ThreadPool
{
 public:

 ThreadPool();
~ThreadPool();

// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);

// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob( void (*workFunc)(void *), void *workData );

// Begins execution of all the jobs in the pool.
void StartJobs();

// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();

private:

enum typeOfWorkEnum { e_work, e_quit };

 class ThreadData
 {
   public:

    bool ready;  // thread has been created and is ready for work  
    bool haveWorkToDo;
    typeOfWorkEnum  typeOfWork;

    // Pointer to the work function each thread has to call.
    void (*workFunc)(void *);

    // Pointer to work data
    void *workData;

    ThreadData() : ready(false), haveWorkToDo(false) {  };
 };

struct ThreadArgStruct
{
    ThreadPool *threadPoolInstance;
    int         threadId;
};

// Data for each thread
ThreadData  *m_ThreadData;

ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden

// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
    ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
    threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}

// The function each thread calls    
void ThreadFunc( int threadId );

// Called by the thread pool destructor
void DestroyThreadPool();

// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;

// List of threads
std::vector<tthread::thread *> m_ThreadList;

// Condition variable to signal each thread has been created and executing
tthread::mutex              m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;

 // Condition variable to signal each thread to start work
tthread::mutex              m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;

// Condition variable to signal the main thread that 
// all threads in the pool have completed their work
tthread::mutex              m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};

cpp file:

//
//  ThreadPool.cpp
//

#include "ThreadPool.h"    

// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc( int threadId )
{
 ThreadData *myThreadData = &m_ThreadData[threadId]; 
 std::cout << "Hello world: Thread " << threadId << std::endl;

 // Signal that this thread is ready
 m_ThreadReady_mutex.lock();
       myThreadData->ready = true;
       m_ThreadReady_condvar.notify_one(); // notify the main thread
 m_ThreadReady_mutex.unlock();       

 while(true)
 {
    //tthread::lock_guard<tthread::mutex> guard(m);
    m_WorkToDo_mutex.lock();

    while(!myThreadData->haveWorkToDo) // check for work to do
         m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here 
    myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex

    m_WorkToDo_mutex.unlock();

    // Do the work
    switch(myThreadData->typeOfWork)
    {
        case e_work:
            std::cout << "Thread " << threadId << ": Woken with work to do\n";

            // Do work
            myThreadData->workFunc(myThreadData->workData);

            std::cout << "#Thread " << threadId  << ": Work is completed\n";
            break;

         case e_quit:
             std::cout << "Thread " << threadId << ": Asked to quit\n";
             return; // ends the thread
    }

    // Now to signal the main thread that my work is completed
    m_WorkCompleted_mutex.lock();
       m_NumOfThreadsDoingWork--;

      // Unsure if this 'if' would make the program more efficient
      // if(m_NumOfThreadsDoingWork == 0)
           m_WorkCompleted_condvar.notify_one(); // notify the main thread
    m_WorkCompleted_mutex.unlock();       
  }

}


ThreadPool::ThreadPool() 
{ 
   m_numOfThreads = 0;  m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}


ThreadPool::~ThreadPool()
{
    if(m_numOfThreads)
    {
    DestroyThreadPool(); 
    delete [] m_ThreadData;
    }
}


void ThreadPool::CreateThreads(int numOfThreads)
{
// Check if a thread pool has already been created
if(m_numOfThreads > 0) 
   return;

m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;

for(int i=0; i<m_numOfThreads; ++i)
 {   
    threadArg.threadId = i;
    threadArg.threadPoolInstance = this;

    // Creates the thread and saves it in a list so we can destroy it later
    m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg  ) ); 

    // It takes a little time for a thread to get established.
    // Best wait until it gets established before creating the next thread.
    m_ThreadReady_mutex.lock();
    while(!m_ThreadData[i].ready)  // Check if thread is ready
        m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
    m_ThreadReady_mutex.unlock();    
 } 
} 


// Assigns a job to a thread, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
 // Check if the thread pool has been created
 if(!m_numOfThreads) 
    return;

 if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
    return;

 m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
 m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;  

 std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;

 m_NumOfThreadsGivenJobs++;  
}

void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs) 
   return;

// Set 'haveworkToDo' flag for all threads 
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
   {
       m_ThreadData[i].typeOfWork = e_work;  // forgot to do this !
       m_ThreadData[i].haveWorkToDo = true;
   }
   m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;

   // Reset this counter so we can resubmit jobs later
   m_NumOfThreadsGivenJobs = 0;

   // Notify all threads they have work to do
   m_WorkToDo_condvar.notify_all();
   m_WorkToDo_mutex.unlock();
}


void ThreadPool::WaitForJobsToComplete()
{
  // Check that a thread pool has been created
  if(!m_numOfThreads) 
   return;

 m_WorkCompleted_mutex.lock();
 while(m_NumOfThreadsDoingWork > 0)  // Check if all threads have completed their work
   m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
 m_WorkCompleted_mutex.unlock();    
}


void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
   for(int i=0; i<m_numOfThreads; ++i)
   {
     m_ThreadData[i].haveWorkToDo = true;
     m_ThreadData[i].typeOfWork = e_quit;
   }
   m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();

// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
 {
     tthread::thread *t = m_ThreadList[i];

     // Wait for thread to complete
     t->join();
 }
 m_numOfThreads = 0;
}

Example of usage: (this calculates pi-squared/6 by summing reciprocals of squares) Actually, this usage example runs the same calculation 10 times in parallel. A more practical usage would be for each thread to compute a different set of the summed terms. The final result is then obtained by adding all the thread results once the pool job has completed.

struct CalculationDataStruct
{
int inputVal;
double outputVal;
};

void LongCalculation( void *theSums )
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;

int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
    sum += 1.0/( double(i)*double(i) );
sums->outputVal = sum;
}


int main(int argc, char** argv)
{ 
int numThreads = 10;

// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);

// Create thread workspace
CalculationDataStruct sums[numThreads];

// Set up jobs
for(int i=0; i<numThreads; i++)
{
    sums[i].inputVal = 3000*(i+1);
    threadPool.SubmitJob(LongCalculation, &sums[i]);
}

// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();

// Print results
for(int i=0; i<numThreads; i++)
   std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;

 return 0;
}

Question: In the ThreadPool::ThreadFunc method, would better performance be obtained if the following if statement

if(NumOfThreadsDoingWork == 0)

was included? Also, I'd be grateful of criticisms and ways to improve the code. At the same time, I hope the code is of use to others.

like image 883
ticketman Avatar asked Jun 27 '12 21:06

ticketman


1 Answers

Firstly, you may want to look into C++11's "std::thread" and "std::mutex". You may also want to investigate Intel's "Threading Building Blocks" which provides a number of patterns for work distribution. For a portable, cross-platform, C++-encapsulated API I have generally used the OpenThreads library. Lastly, you can build scalable, distributed work loads without mutexes using a message passing library, like ZeroMQ.

Looking at your current code, my biggest concern would be that you don't appear to be locking the variables used to assign work to threads; I'm assuming that's because you've separated SubmitJob and StartWork.

But ultimately, your ThreadPool isn't thread safe.

It's also a bit of a complex API with the types of work, etc. You probably need to abstract out the concept of "work". Here's an example where I'd done that, you'd probably want to encapsulate the bulk of the code back into your ThreadPool class; also the termination method (NULL job) is kinda artificial, you'd probably want to use pthread_cancel, but this served this demonstration fairly well.

#include <queue>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

static int jobNo = 0;
class Job {
public:
    Job() : m_i(++jobNo) { printf("Created job %d.\n", m_i); }
    int m_i;
    void Execute() { printf("Job %d executing.\n", m_i); usleep(500 * 1000); }
};

std::queue<Job*> queue;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void AddJob(Job* job) {
    pthread_mutex_lock(&mutex);
    queue.push(job);
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
}

void* QueueWorker(void* /*threadInfo*/) {
    Job* job = NULL;
    for (;;) {
        pthread_mutex_lock(&mutex);
        while ( queue.empty() ) {
            // unlock the mutex until the cond is signal()d or broadcast() to.
            // if this call succeeds, we will have the mutex locked again on the other side.
            pthread_cond_wait(&cond, &mutex);
        }
        // take the first task and then release the lock.
        job = queue.front();
        queue.pop();
        pthread_mutex_unlock(&mutex);

        if ( job == NULL ) {
            // in this demonstration, NULL ends the run, so forward to any other threads.
            AddJob(NULL);
            break;
        }
        job->Execute();
        delete job;
    }
    return NULL;
}

int main(int argc, const char* argv[]) {
    pthread_t worker1, worker2;
    pthread_create(&worker1, NULL, &QueueWorker, NULL);
    pthread_create(&worker2, NULL, &QueueWorker, NULL);

    srand(time(NULL));

    // queue 5 jobs with delays.
    for ( size_t i = 0; i < 5; ++i ) {
        long delay = (rand() % 800) * 1000;
        printf("Producer sleeping %fs\n", (float)delay / (1000*1000));
        usleep(delay);
        Job* job = new Job();
        AddJob(job);
    }
    // 5 more without delays.
    for ( size_t i = 0; i < 5; ++i ) {
        AddJob(new Job);
    }
    // null to end the run.
    AddJob(NULL);

    printf("Done with jobs.\n");
    pthread_join(worker1, NULL);
    pthread_join(worker2, NULL);

    return 0;
}
like image 121
kfsone Avatar answered Nov 15 '22 00:11

kfsone