I'm stuck on a problem when trying to awake a thread by another one. A simple producer / consumer thing.
Below the code. Line 85 is the point I don't understand why it's not working. The producer thread fills up a std::queue and calls std::condition_variable.notify_one() while the consumer thread is waiting for NOT std::queue.empty().
Thanks in advance for any help
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
#include <iostream>
#include <thread>
// request
class request :
public std::mutex,
public std::condition_variable,
public std::queue<std::string>
{
public:
virtual ~request();
};
request::~request()
{
}
// producer
class producer
{
public:
producer(request &);
virtual ~producer();
void operator()();
private:
request & request_;
};
producer::producer(request & _request)
:
request_(_request)
{
}
producer::~producer()
{
}
void
producer::operator()()
{
while (true) {
std::lock_guard<std::mutex> lock(request_);
std::cout << "producer\n";
request_.push("something");
std::this_thread::sleep_for(std::chrono::seconds(1));
request_.notify_one();
}
}
class consumer
{
public:
consumer(request &);
virtual ~consumer();
void operator()();
private:
request & request_;
};
consumer::consumer(request & _request)
:
request_(_request)
{
}
consumer::~consumer()
{
}
void
consumer::operator()()
{
while (true) {
std::unique_lock<std::mutex> lock(request_); // <-- the problem
std::cout << "consumer\n";
request_.wait (
lock, [this] {return !request_.empty();}
);
request_.pop();
}
}
int
main()
{
// request
request request_;
// producer
std::thread producer_{producer(request_)};
// consumer
std::thread first_consumer_{consumer(request_)};
std::thread second_consumer_{consumer(request_)};
// join
producer_.join();
first_consumer_.join();
second_consumer_.join();
}
std::condition_variable The condition_variable class is a synchronization primitive that can be used to block a thread, or multiple threads at the same time, until another thread both modifies a shared variable (the condition), and notifies the condition_variable .
You can use std::promise/std::future as a simpler alternative to a bool / condition_variable / mutex in this case. A future is not susceptible to spurious wakes and doesn't require a mutex for synchronisation. Save this answer.
Condition Variable is a kind of Event used for signaling between two or more threads. One or more thread can wait on it to get signaled, while an another thread can signal this. A mutex is required along with condition variable.
A condition variable allows one or more threads to wait until they are notified by another thread. If the lock argument is given and not None , it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock.
Fixed code below, with these changes:
cout
(I used endl
to do that) so the output is printed immediately, this makes it easier to see what's happening."consumer"
after waking, because that's when the consumer is consuming, otherwise you get misleading output showing when the consumer is sleeping, not when it gets work to do.The main problem with your code was the producer never gave the consumers a chance to run. It added to the queue, slept for a second (still holding the mutex lock) then notified the condition variable (still holding the mutex), then really quickly released the mutex lock and acquired it again. Probably what you saw is that a consumer thread got the notification, tried to acquire the mutex lock, found it was still locked (by the producer thread) and so went back to sleep. The producer never released the mutex long enough for another thread to acquire it. You might have been able to get better results by adding a std::this_thread::yield()
at the start of the producer loop, before locking the mutex, but algorithms that rely on yield()
for correctness are generally broken (and indeed it makes no difference in my tests); it's better to fix the producer loop to give the consumers a chance to wake up and run.
Here's the working code:
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
#include <iostream>
#include <thread>
// request
struct request
{
std::mutex mx;
std::condition_variable cv;
std::queue<std::string> q;
};
// producer
class producer
{
public:
producer(request & r) : request_(r) { }
void operator()();
private:
request & request_;
};
void
producer::operator()()
{
while (true) {
{
std::lock_guard<std::mutex> lock(request_.mx);
std::cout << "producer" << std::endl;
request_.q.push("something");
}
std::this_thread::sleep_for(std::chrono::seconds(1));
request_.cv.notify_one();
}
}
class consumer
{
public:
consumer(request & r) : request_(r) { }
void operator()();
private:
request & request_;
};
void
consumer::operator()()
{
while (true) {
std::unique_lock<std::mutex> lock(request_.mx);
request_.cv.wait (
lock, [this] {return !request_.q.empty();}
);
std::cout << "consumer" << std::endl;
request_.q.pop();
}
}
int
main()
{
// request
request request_;
// producer
std::thread producer_{producer(request_)};
// consumer
std::thread first_consumer_{consumer(request_)};
std::thread second_consumer_{consumer(request_)};
// join
producer_.join();
first_consumer_.join();
second_consumer_.join();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With