Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Shutdown boost threads correctly

I have x boost threads that work at the same time. One producer thread fills a synchronised queue with calculation tasks. The consumer threads pop out tasks and calculates them.

Synchronised Queue Image Source: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/

The user may finish the programm during this process, so I need to shutdown my threads properly. My current approach seems to not work, since exceptions are thrown. It's intented that on system shutdown all processes should be killed and stop their current task no matter what they do. Could you please show me, how you would kill thoses threads?

Thread Initialisation:

    for (int i = 0; i < numberOfThreads; i++)
    {
        std::thread* thread = new std::thread(&MyManager::worker, this);
        mThreads.push_back(thread);
    }

Thread Destruction:

void MyManager::shutdown()
{
    for (int i = 0; i < numberOfThreads; i++)
    {
        mThreads.at(i)->join();
        delete mThreads.at(i);
    }
    mThreads.clear();
}

Worker:

void MyManager::worker()
{
    while (true)
    {

        int current = waitingList.pop();
        Object * p = objects.at(current);
        p->calculateMesh(); //this task is internally locked by a mutex

        try
        {
            boost::this_thread::interruption_point();
        }
        catch (const boost::thread_interrupted&)
        {
            // Thread interruption request received, break the loop
            std::cout << "- Thread interrupted. Exiting thread." << std::endl;
            break;
        }
    }
}

Synchronised Queue:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class ThreadSafeQueue
{
public:

    T pop()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        while (queue_.empty())
        {
            cond_.wait(mlock);
        }
        auto item = queue_.front();
        queue_.pop();

        return item;
    }

    void push(const T& item)
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        queue_.push(item);
        mlock.unlock();
        cond_.notify_one();
    }


    int sizeIndicator()
    {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.size();
    }


private:

    bool isEmpty() {
        std::unique_lock<std::mutex> mlock(mutex_);
        return queue_.empty();
    }

    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable cond_;
};

The thrown error call stack:

... std::_Mtx_lockX(_Mtx_internal_imp_t * * _Mtx) Line 68   C++
... std::_Mutex_base::lock() Line 42    C++
... std::unique_lock<std::mutex>::unique_lock<std::mutex>(std::mutex & _Mtx) Line 220   C++
... ThreadSafeQueue<int>::pop() Line 13 C++
... MyManager::worker() Zeile 178   C++
like image 624
Anthea Avatar asked Jun 11 '15 09:06

Anthea


1 Answers

From my experience on working with threads in both Boost and Java, trying to shut down threads externally is always messy. I've never been able to really get that to work cleanly.

The best I've gotten is to have a boolean value available to all the consumer threads that is set to true. When you set it to false, the threads will simply return on their own. In your case, that could easily be put into the while loop you have.

On top of that, you're going to need some synchronization so that you can wait for the threads to return before you delete them, otherwise you can get some hard to define behavior.

An example from a past project of mine:

Thread creation

barrier = new boost::barrier(numOfThreads + 1);
threads = new detail::updater_thread*[numOfThreads];

for (unsigned int t = 0; t < numOfThreads; t++) {
    //This object is just a wrapper class for the boost thread.
    threads[t] = new detail::updater_thread(barrier, this);
}

Thread destruction

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->requestStop();//Notify all threads to stop.
}

barrier->wait();//The update request will allow the threads to get the message to shutdown.

for (unsigned int i = 0; i < numOfThreads; i++) {
    threads[i]->waitForStop();//Wait for all threads to stop.
    delete threads[i];//Now we are safe to clean up.
}

Some methods that may be of interest from the thread wrapper.

//Constructor
updater_thread::updater_thread(boost::barrier * barrier)
{
   this->barrier = barrier;
   running = true;

   thread = boost::thread(&updater_thread::run, this);
}

void updater_thread::run() {
    while (running) {
        barrier->wait();
        if (!running) break;

        //Do stuff

        barrier->wait();
    }
}

void updater_thread::requestStop() {
    running = false;
}

void updater_thread::waitForStop() {
    thread.join();
}

 

like image 148
That Crazy Carl Guy Avatar answered Oct 28 '22 12:10

That Crazy Carl Guy