Using C++11 std::thread
, std::mutex
, I'm writing a simple worker thread.
However, I got a strange hang issue in locking std::mutex
, which looks like both threads (main and worker thread) are trying to lock a mutex, but both are blocked.
Here's the full code
#include <thread>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <list>
std::condition_variable cv;
std::mutex m;
std::thread t;
bool shouldExit = false;
std::list<int> jobs;
void thread_func()
{
std::unique_lock<std::mutex> lock(m);
while (!shouldExit) {
while (jobs.empty() && !shouldExit) {
cv.wait(lock);
}
// Do some stuff
if (jobs.empty()) {
continue;
}
// Get a job and do something with it
if (!lock.owns_lock()) {
lock.lock(); // <<<< Worker thread hang here
}
auto j = std::move(jobs.front());
jobs.pop_front();
lock.unlock();
std::cout << "Do something with job " << j << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
int main()
{
t = std::thread(thread_func);
for (int i = 1; i < 100; ++i) {
std::cout << "Push to job " << i << std::endl;
{
std::lock_guard<std::mutex> lock(m); // <<<< main thread hang here
jobs.push_back(i);
cv.notify_one();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// To wait for thread exit
shouldExit = true;
cv.notify_one();
t.join();
return 0;
}
I'm compiling the code using below command on Ubuntu 14.04
g++ -std=c++11 -g -O0 -pthread -o testthread testthread.cpp
The executing result is typically like this:
$ ./testthread
Push to job 1
Do something with job 1
Push to job 2
Do something with job 2
Push to job 3
Push to job 4
The interesting part is, when I move one line code of sleeping-1ms in main thread into the lock_guard
like below, the issue is gone.
for (int i = 1; i < 100; ++i) {
std::cout << "Push to job " << i << std::endl;
{
std::lock_guard<std::mutex> lock(m);
jobs.push_back(i);
cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Moved into lock_guard
}
}
I could not figure out why. Could you help to explain the behavior of the code and what I did wrong?
[Update] I know re-writing the worker thread in certain way could fix the issue. But I still would like to know in the original code what exactly happens when the two threads are locking the mutex but both are blocked.
Mutexes are used to protect shared resources. If the mutex is already locked by another thread, the thread waits for the mutex to become available. The thread that has locked a mutex becomes its current owner and remains the owner until the same thread has unlocked it.
If a thread which had already locked a mutex, tries to lock the mutex again, it will enter into waiting list of that mutex which results in deadlock.
Secondly, the std::mutex is implemented in way that it spin locks for a bit before actually doing system calls when it fails to immediately obtain the lock on a mutex (which no doubt will be extremely slow).
A problem exists if two threads attempt to claim both resources but lock the associated mutexes in different orders. For example, if the two threads lock mutexes 1 and 2 respectively, a deadlock occurs when each attempts to lock the other mutex.
It is undefined behavior to call cv.wait
with lock
not locked. Add this assert:
while (!shouldExit) {
assert(lock.owns_lock()); // <------ add this
while (jobs.empty() && !shouldExit) {
cv.wait(lock);
}
libc++ will throw from the wait
if !lock.owns_lock()
, but I don't know what other implementations will do.
You have serious and classic bugs in your code....
First, please see the annotated/numbered comments. I will refer to them
void thread_func()
{
std::unique_lock<std::mutex> lock(m); // <---- {1}
while (!shouldExit) { // <---- {2}
while (jobs.empty() && !shouldExit) { // <---- {3}
cv.wait(lock);
}
// Do some stuff
if (jobs.empty()) {
continue;
}
if (!lock.owns_lock()) {
lock.lock(); // <---- {4}
}
auto j = std::move(jobs.front());
jobs.pop_front();
lock.unlock(); // <---- {5}
std::cout << "Do something with job " << j << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
{1} This is good.... see you defeating the aim of this in {5}
{2} shouldExit
should be an atomic bool. Else you will have race conditions
{3} At some point in execution, this condition will be tested while not holding the lock, see the unlock statement in {5}. Hence, you have yet another race condition.
{4} With an unlocked mutex, between the time you test the lock and the time you issue lock, the mutex could be acquired, causing this to perpetually wait here.
{5} Makes the mutex unlocked for the next execution of the loop... serious race conditions and deadlock will happen.
Just add lock.lock()
to the last line to your thread_func()
like this....
void thread_func()
{
.....more code omitted
........
lock.unlock();
std::cout << "Do something with job " << j << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
lock.lock(); //YOUR NEW LINE
}
}
The addition restores the loop back to the original state of a mutex being locked before entry.... Note that there is another code path to reach the entry of the loop... where you had continue
statement... Whenever a std::condition_variable::wait()
returns, the lock is always relocked, so invariant is still maintained...
Now your code works!! Yay!!! ... But it still smells a lot!
std::cout
is thread-safe but the output isn't synchronized, therefore, you may have interleaved characters...
Sidelining the problem with std::cout
How to do it properly? Check this code (also please see the comments)
void thread_func()
{
std::unique_lock<std::mutex> lock(m);
while (!shouldExit) // this is redundant, so I removed it in the final code
{
while (jobs.empty() && !shouldExit)
{
cv.wait(lock, []{ return !jobs.empty(); } );
}
// Do some stuff
auto j = std::move(jobs.front());
jobs.pop_front();
//cout is thread-safe but not synchronized
//std::cout << "Do something with job " << j << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
In most common cases I know of, its always better to test your "ready to proceed" conditions inside std::condition_variable::wait()
.
To put it all together for you.... Here is a better version
#include <thread>
#include <condition_variable>
#include <memory>
#include <iostream>
#include <list>
#include <atomic>
std::condition_variable cv;
std::mutex m;
std::mutex mxa; //for std::cout locking
std::thread t;
std::atomic<bool> shouldExit;
std::list<int> jobs;
namespace detail
{
std::ostream& safe_print()
{
return std::cout;
}
template<typename T, typename... Args>
std::ostream& safe_print(T&& t, Args&&... args)
{
std::cout << t;
return safe_print(std::forward<Args>(args)...);
}
}
template<typename... Args>
std::ostream& println(Args&&... args)
{
std::lock_guard<std::mutex> lck(mxa);
auto&& x = detail::safe_print(std::forward<Args>(args)...);
std::cout << std::endl;
return x;
}
void thread_func()
{
std::unique_lock<std::mutex> lock(m);
while (jobs.empty() && !shouldExit)
{
cv.wait(lock, []{ return !jobs.empty(); } );
}
// Do some stuff
auto j = std::move(jobs.front());
jobs.pop_front();
//std::cout << "Do something with job " << j << std::endl;
println("Do something with job ", j);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
int main()
{
shouldExit = false;
//safe_print("This is really funny ", 43, '\n');
t = std::thread(thread_func);
for (int i = 1; i < 100; ++i)
{
//std::cout << "Push to job " << i << std::endl;
println("Push to Job ", i);
{
std::lock_guard<std::mutex> lock(m); // <<<< main thread doesn't hang here again
jobs.push_back(i);
cv.notify_one();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// To wait for thread exit
shouldExit = true;
cv.notify_one();
t.join();
return 0;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With