I am trying to understand the basic multithreading mechanisms in the new C++ 11 standard. The most basic example I can think of is the following:
This example is also used in many school books about multithreading and everything about the communication process works fine. However, I have a problem when it comes to stopping the consumer thread.
I want the consumer to run until it gets an explicit stop signal (in most cases this means that I wait for the producer to finish so I can stop the consumer before the program is ended). Unfortunately C++ 11 threads lack an interrupt mechanism (which I know from multithreading in Java for example). Thus, I have to use flags like isRunning
to signal that I want a thread to stop.
The main problem now is: After I have stopped the producer thread, the queue is empty and the consumer is waiting on a condition_variable
to get a signal when the queue is filled again. So I need to wake the thread up by calling notify_all()
on the variable before exiting.
I have found a working solution, but it seems somehow messy. The example code is listed below (I am sorry but somehow I couldn't reduce the code size any furhter for a "minimal" minimal example):
The Queue class:
class Queue{
public:
Queue() : m_isProgramStopped{ false } { }
void push(int i){
std::unique_lock<std::mutex> lock(m_mtx);
m_q.push(i);
m_cond.notify_one();
}
int pop(){
std::unique_lock<std::mutex> lock(m_mtx);
m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });
if (m_isProgramStopped){
throw std::exception("Program stopped!");
}
int x = m_q.front();
m_q.pop();
std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
return x;
}
void stop(){
m_isProgramStopped = true;
m_cond.notify_all();
}
private:
std::queue<int> m_q;
std::mutex m_mtx;
std::condition_variable m_cond;
bool m_isProgramStopped;
};
The Producer:
class Producer{
public:
Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }
void produce(){
for (int i = 0; i < 5; i++){
m_q.push(m_counter++);
std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
}
}
void execute(){
m_t = std::thread(&Producer::produce, this);
}
void join(){
m_t.join();
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_counter;
};
The Consumer:
class Consumer{
public:
Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
{ }
~Consumer(){
std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
}
void consume(){
while (m_isRunning){
try{
m_q.pop();
m_takeCounter++;
}
catch (std::exception e){
std::cout << "Program was stopped while waiting." << std::endl;
}
}
}
void execute(){
m_t = std::thread(&Consumer::consume, this);
}
void join(){
m_t.join();
}
void stop(){
m_isRunning = false;
}
private:
Queue & m_q;
std::thread m_t;
unsigned int m_takeCounter;
bool m_isRunning;
};
And finally the main()
:
int main(void){
Queue q;
Consumer cons{ q };
Producer prod{ q };
cons.execute();
prod.execute();
prod.join();
cons.stop();
q.stop();
cons.join();
std::cout << "END" << std::endl;
return EXIT_SUCCESS;
}
Is this the right way to end a thread that is waiting an a condition variable or are there better methods? Currently, the queue needs to know if the program has stopped (which in my opinion destroys the loose coupling of the components) and I need to call stop()
on the queue explicitly which doesn't seem right.
Additionaly, the condition variable which should just be used as a singal if the queue is empty now stands for another condition - if the program has ended. If I am not mistaken, every time a thread waits on a condition variable for some event to happen, it would also have to check if the thread has to be stopped before continuing its execution (which also seems wrong).
Do I have these problems because my entire design is faulty or am I missing some mechanisms that can be used to exit threads in a clean way?
std::condition_variable The condition_variable class is a synchronization primitive that can be used to block a thread, or multiple threads at the same time, until another thread both modifies a shared variable (the condition), and notifies the condition_variable .
To wait for a thread use the std::thread::join() function. This function makes the current thread wait until the thread identified by *this has finished executing. A C++ program is given below.
Condition variables: used to wait for a particular condition to become true (e.g. characters in buffer). wait(condition, lock): release lock, put thread to sleep until condition is signaled; when thread wakes up again, re-acquire lock before returning.
condition_variable::wait wait causes the current thread to block until the condition variable is notified or a spurious wakeup occurs, optionally looping until some predicate is satisfied (bool(stop_waiting()) == true).
No, there's nothing wrong with your design, and it's the normal approach taken for this sort of problem.
It's perfectly valid for you to have multiple conditions (eg anything on queue or program stopping) attached to a condition variable. The key thing is that the bits in the condition are checked for when the wait
returns.
Instead of having a flag in Queue
to indicate that the program is stopping you should think of the flag as "can I accept". This is a better overall paradigm and works better in a multi-threaded environment.
Also, instead of having pop
throw an exception if someone calls it and stop
has been called you could replace the method with bool try_pop(int &value)
which will return true
if a value was returned, otherwise false
. This way the caller can check on failure to see if the queue has been stopped (add a bool is_stopped() const
method). Although exception handling works here it's a bit heavy handed and isn't really an exceptional case in a multi-threaded program.
wait
can be called with a timeout. Control is returned to the thread and stop
could be checked. Depending on that value it can wait
on more items to be consumed or finish execution. A good introduction to multithreading with c++ is C++11 Concurrency .
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With