Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing a condition_variable to solve a multithreaded busy-wait

My program prints multiple lines of text to the console through the use of idle worker threads. The problem, however, is that the workers aren't waiting on previous workers to finish before printing the text, which results in text being inserted into the text of another worker thread, as seen in the picture below:

enter image description here

I need to fix this problem - known as the busy-wait problem - through the use of std::condition_variable. I've tried to implement the condition_variable in the code below, based on the example found at this link, and the following stackoverflow question has helped me, but not enough, because of my limited knowledge of C++ in general. So in the end I only ended up commenting everything back out, and I am now at a loss.

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>

class ThreadPool; // forward declare
//std::condition_variable cv;
//bool ready = false;
//bool processed = false;

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        //cv.wait(locker, [] {return ready; });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            //cv.notify_one();
            //processed = true;
            task();
        }
        else {
            locker.unlock();
            //cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    //cv.wait(lock, [] { return processed; });
    tasks.push_back(std::function<void()>(f));
    //ready = true;
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}
like image 449
user3776022 Avatar asked Feb 02 '17 10:02

user3776022


People also ask

What is std :: condition_variable?

std::condition_variable The condition_variable class is a synchronization primitive used with a std::mutex to block one or more threads until another thread both modifies a shared variable (the condition) and notifies the condition_variable .

Can thread wait in multiple condition variables?

What is the best way to wait on multiple condition variables in C++11? You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time.

How does condition variable works C++?

Condition variables allow one to atomically release a held mutex and put the thread to sleep. Then, after being signaled, atomically re-acquire the mutex and wake up. You run into this, for example, in the producer/consumer problem.


1 Answers

Here's a working sample:

// threadpool.cpp
// Compile with:
// g++ -std=c++11 -pthread threadpool.cpp -o threadpool

#include <thread>
#include <mutex>
#include <iostream>
#include <vector>
#include <deque>
#include <atomic>

class ThreadPool; 

// forward declare
std::condition_variable ready_cv;
std::condition_variable processed_cv;
std::atomic<bool> ready(false);
std::atomic<bool> processed(false);

class Worker {
public:
    Worker(ThreadPool &s) : pool(s) { }
    void operator()();
private:
    ThreadPool &pool;
};

class ThreadPool {
public:
    ThreadPool(size_t threads);
    template<class F> void enqueue(F f);
    ~ThreadPool();
private:
    friend class Worker;

    std::vector<std::thread> workers;
    std::deque<std::function<void()>> tasks;

    std::mutex queue_mutex;
    bool stop;
};

void Worker::operator()()
{
    std::function<void()> task;

    // in real life you need a variable here like while(!quitProgram) or your
    // program will never return. Similarly, in real life always use `wait_for`
    // instead of `wait` so that periodically you check to see if you should
    // exit the program
    while (true)
    {
        std::unique_lock<std::mutex> locker(pool.queue_mutex);
        ready_cv.wait(locker, [] {return ready.load(); });

        if (pool.stop) return;
        if (!pool.tasks.empty())
        {
            task = pool.tasks.front();
            pool.tasks.pop_front();
            locker.unlock();
            task();
            processed = true;
            processed_cv.notify_one();
        }
    }
}

ThreadPool::ThreadPool(size_t threads) : stop(false)
{
    for (size_t i = 0; i < threads; ++i)
        workers.push_back(std::thread(Worker(*this)));
}

ThreadPool::~ThreadPool()
{
    stop = true; // stop all threads

    for (auto &thread : workers)
        thread.join();
}

template<class F>
void ThreadPool::enqueue(F f)
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    tasks.push_back(std::function<void()>(f));
    processed = false;
    ready = true;
    ready_cv.notify_one();
    processed_cv.wait(lock, [] { return processed.load(); });
}

int main()
{
    ThreadPool pool(4);

    for (int i = 0; i < 8; ++i) pool.enqueue([i]() { std::cout << "Text printed by worker " << i << std::endl; });

    std::cin.ignore();
    return 0;
}

Output:

Text printed by worker 0 
Text printed by worker 1 
Text printed by worker 2 
Text printed by worker 3 
Text printed by worker 4 
Text printed by worker 5 
Text printed by worker 6 
Text printed by worker 7

Why not to do this in production code

Since the assignment is to print strings in order, this code can't actually really be parallelized, and so we've contrived a way to make it work completely sequentially using the required Golden hammer of std::condition_variable. But at least we got rid of that darned busy wait!

In a real example, you'd want to process data or do the tasking in parallel, and synchronize just the output, and this structure is still not the right way to approach that if you were doing it from scratch.

What I changed and why

I used atomic bools for the conditions because they have deterministic behavior when shared among multiple threads. Not strictly necessary in all cases but a good practice none the less.

You should include an exit condition in the while(true) loop (e.g. a flag that is set by a SIGINT handler or something) or your program will never exit. It's just an assignment so we skipped it, but this is very important not to neglect in production code.

Maybe the assignment could be solved with one condition variable, but I'm not sure about that, and in any case it's better to use two because it is much more clear and readable what each one does. Basically, we wait for a task, then ask the enqueuer to wait until it's done, then tell it that it is in fact processed, we're ready for the next one. You were very much on the right track initially, but I think with two cv's it's more obvious what was going wrong.

Additionally, it's important to set the condition vars (ready and processed) before using notify().

I deleted locker.unlock() because the case is unnecessary. c++ std locks are RAII structures and so the lock will be unlocked when it goes out of scope, which is basically the very next line. It's best in general to avoid pointless branching as you make your program unnecessarily stateful.

Pedagogical rant...

Now that the problem at hand has been addressed and solved, I have a few things that I think need to be said about the assignment in general and which I think are probably going to be more important for your learning than solving the problem as stated.

If you were confused or frustruated by the assignment, then good, you should be. It makes sense that you're having a hard time putting a square peg into a round hole, and I think the real value in this problem is learning to tell when you are using the right tool for the right job and when you aren't.

Condition variables are the right tool to solve the busy-loop problem, however this assignment (as pointed out by @n.m.) is a simple race condition. That said, it's only a simple race condition because it includes an unecessary and poorly implemented thread-pool, making the problem complex and difficult to comprehend for absolutely no purpose. And that said, std::async should be prefered over hand-rolled thread pools in modern c++ anyway (it's both way easier to implement correctly, and more performant on many platforms, and doesn't necessitate a bunch of globals and singletons and exclusively allocated resources).

If this were an assignment from your boss instead of your professor, this is what you would turn in:

for(int i = 0; i < 8; ++i)
{
    std::cout << "Text printed by worker " << i << std::endl;
}

This problem is solved (optimally) by a simple for loop. The busy wait/locking problems are a result of having a terrible design, and the "right" thing to do is fix the design, not bandage it. I don't even think the assignment is instructive, because there's no possible way or reason to parallelize the output, so it just ends up being confusing to everyone, including the SO community. It seems like negative training that threads just introduce needless complexity without improving the calculation.

It's hard to actually tell if the professor himself understands the concepts of threading and condition variables very well from the structure of the assignment. Assignments by necessity have to be boiled-down, simplified, and somewhat trivialized for training purposes, but that's actually the opposite of what was done here, where a complex problem was made out of a simple one.

As a rule I never answer homework related questions on SO, because I think giving away answers impedes learning, and that a developers most valuable skill is learning how to beat their head against a wall until an idea pops into it. However, there's nothing but negative training to be had from contrived assignments like this one, and although in school you have to play by the professors rules, it's important to learn to recognize contrived problems when you see them, deconstruct them, and come to the simple and correct solution.

like image 76
Nicolas Holthaus Avatar answered Nov 14 '22 23:11

Nicolas Holthaus