Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Extend the life of threads with synchronization (C++11)

I have a program with a function which takes a pointer as arg, and a main. The main is creating n threads, each of them running the function on different memory areas depending on the passed arg. Threads are then joined, the main performs some data mixing between the area and creates n new threads which do the the same operation as the old ones.

To improve the program I would like to keep the threads alive, removing the long time necessary to create them. Threads should sleep when the main is working and notified when they have to come up again. At the same way the main should wait when threads are working as it did with join.

I cannot end up with a strong implementation of this, always falling in a deadlock.

Simple baseline code, any hints about how to modify this would be much appreciated

#include <thread>
#include <climits>

...

void myfunc(void * p) {
  do_something(p);
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};
  std::thread mythread[n_threads];
  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i]);
    }
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i].join();
    }
    mix_data(myp); 
  }
  return 0;
}
like image 684
DarioP Avatar asked Mar 06 '13 16:03

DarioP


People also ask

What is thread synchronization in C?

Prerequisite : Multithreading in C. Thread synchronization is defined as a mechanism which ensures that two or more concurrent processes or threads do not simultaneously execute some particular program segment known as a critical section.

When should you synchronize threads?

Thread synchronization is the concurrent execution of two or more threads that share critical resources. Threads should be synchronized to avoid critical resource use conflicts. Otherwise, conflicts may arise when parallel-running threads attempt to modify a common variable at the same time.

What is C++ thread safe?

An object is thread-safe for reading from multiple threads. For example, given an object A, it is safe to read A from thread 1 and from thread 2 simultaneously. If an object is being written to by one thread, then all reads and writes to that object on the same or other threads must be protected.

Does C++ have a thread pool?

Threadpool in C++ is basically a pool having a fixed number of threads used when we want to work multiple tasks together (run multiple threads concurrently). This thread sits idle in the thread pool when there are no tasks and when a task arrives, it is sent to the thread pool and gets assigned to the thread.


2 Answers

Here is a possible approach using only classes from the C++11 Standard Library. Basically, each thread you create has an associated command queue (encapsulated in std::packaged_task<> objects) which it continuously check. If the queue is empty, the thread will just wait on a condition variable (std::condition_variable).

While data races are avoided through the use of std::mutex and std::unique_lock<> RAII wrappers, the main thread can wait for a particular job to be terminated by storing the std::future<> object associated to each submitted std::packaged_tast<> and call wait() on it.

Below is a simple program that follows this design. Comments should be sufficient to explain what it does:

#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>

// Convenience type definition
using job = std::packaged_task<void()>;

// Some data associated to each thread.
struct thread_data
{
    int id; // Could use thread::id, but this is filled before the thread is started
    std::thread t; // The thread object
    std::queue<job> jobs; // The job queue
    std::condition_variable cv; // The condition variable to wait for threads
    std::mutex m; // Mutex used for avoiding data races
    bool stop = false; // When set, this flag tells the thread that it should exit
};

// The thread function executed by each thread
void thread_func(thread_data* pData)
{
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
    while (true)
    {
        l.lock();

        // Wait until the queue won't be empty or stop is signaled
        pData->cv.wait(l, [pData] () {
            return (pData->stop || !pData->jobs.empty()); 
            });

        // Stop was signaled, let's exit the thread
        if (pData->stop) { return; }

        // Pop one task from the queue...
        job j = std::move(pData->jobs.front());
        pData->jobs.pop();

        l.unlock();

        // Execute the task!
        j();
    }
}

// Function that creates a simple task
job create_task(int id, int jobNumber)
{
    job j([id, jobNumber] ()
    {
        std::stringstream s;
        s << "Hello " << id << "." << jobNumber << std::endl;
        std::cout << s.str();
    });

    return j;
}

int main()
{
    const int numThreads = 4;
    const int numJobsPerThread = 10;
    std::vector<std::future<void>> futures;

    // Create all the threads (will be waiting for jobs)
    thread_data threads[numThreads];
    int tdi = 0;
    for (auto& td : threads)
    {
        td.id = tdi++;
        td.t = std::thread(thread_func, &td);
    }

    //=================================================
    // Start assigning jobs to each thread...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();


    //=================================================
    // Here the main thread does something...

    std::cin.get();

    // ...done!
    //=================================================


    //=================================================
    // Posts some new tasks...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();

    // Send stop signal to all threads and join them...
    for (auto& td : threads)
    {
        std::unique_lock<std::mutex> l(td.m);
        td.stop = true;
        td.cv.notify_one();
    }

    // Join all the threads
    for (auto& td : threads) { td.t.join(); }
}
like image 135
Andy Prowl Avatar answered Oct 02 '22 13:10

Andy Prowl


The concept you want is the threadpool. This SO question deals with existing implementations.

The idea is to have a container for a number of thread instances. Each instance is associated with a function which polls a task queue, and when a task is available, pulls it and run it. Once the task is over (if it terminates, but that's another problem), the thread simply loop over to the task queue.

So you need a synchronized queue, a thread class which implements the loop on the queue, an interface for the task objects, and maybe a class to drive the whole thing (the pool class).

Alternatively, you could make a very specialized thread class for the task it has to perform (with only the memory area as a parameter for instance). This requires a notification mechanism for the threads to indicate that they are done with the current iteration.

The thread main function would be a loop on that specific task, and at the end of one iteration, the thread signals its end, and wait on condition variables to start the next loop. In essence, you would be inlining the task code within the thread, dropping the need of a queue altogether.

 using namespace std;

 // semaphore class based on C++11 features
 class semaphore {
     private:
         mutex mMutex;
         condition_variable v;
         int mV;
     public:
         semaphore(int v): mV(v){}
         void signal(int count=1){
             unique_lock lock(mMutex);
             mV+=count;
             if (mV > 0) mCond.notify_all();
         }
         void wait(int count = 1){
             unique_lock lock(mMutex);
             mV-= count;
             while (mV < 0)
                 mCond.wait(lock);
         }
 };

template <typename Task>
class TaskThread {
     thread mThread;
     Task *mTask;
     semaphore *mSemStarting, *mSemFinished;
     volatile bool mRunning;
    public:
    TaskThread(Task *task, semaphore *start, semaphore *finish): 
         mTask(task), mRunning(true), 
         mSemStart(start), mSemFinished(finish),
        mThread(&TaskThread<Task>::psrun){}
    ~TaskThread(){ mThread.join(); }

    void run(){
        do {
             (*mTask)();
             mSemFinished->signal();
             mSemStart->wait();
        } while (mRunning);
    }

   void finish() { // end the thread after the current loop
         mRunning = false;
   }
private:
    static void psrun(TaskThread<Task> *self){ self->run();}
 };

 classcMyTask {
     public:
     MyTask(){}
    void operator()(){
        // some code here
     }
 };

int main(){
    MyTask task1;
    MyTask task2;
    semaphore start(2), finished(0);
    TaskThread<MyTask> t1(&task1, &start, &finished);
    TaskThread<MyTask> t2(&task2, &start, &finished);
    for (int i = 0; i < 10; i++){
         finished.wait(2);
         start.signal(2);
    }
    t1.finish();
    t2.finish();
}

The proposed (crude) implementation above relies on the Task type which must provide the operator() (ie. a functor like class). I said you could incorporate the task code directly in the thread function body earlier, but since I don't know it, I kept it as abstract as I could. There's one condition variable for the start of threads, and one for their end, both encapsulated in semaphore instances.

Seeing the other answer proposing the use of boost::barrier, I can only support this idea: make sure to replace my semaphore class with that class if possible, the reason being that it is better to rely on well tested and maintained external code rather than a self implemented solution for the same feature set.

All in all, both approaches are valid, but the former gives up a tiny bit of performance in favor of flexibility. If the task to be performed takes a sufficiently long time, the management and queue synchronization cost becomes negligible.

Update: code fixed and tested. Replaced a simple condition variable by a semaphore.

like image 22
didierc Avatar answered Oct 02 '22 12:10

didierc