Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sync is unreliable using std::atomic and std::condition_variable

In a distributed job system written in C++11 I have implemented a fence (i.e. a thread outside the worker thread pool may ask to block until all currently scheduled jobs are done) using the following structure:

struct fence
{
    std::atomic<size_t>                     counter;
    std::mutex                              resume_mutex;
    std::condition_variable                 resume;

    fence(size_t num_threads)
        : counter(num_threads)
    {}
};

The code implementing the fence looks like this:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        f->resume.wait(lock);   // (3)
    }
}

This works very well if threads enter the fence over a period of time. However, if they try to do it almost simultaneously, it seems to sometimes happen that between the atomic decrementation (1) and starting the wait on the conditional var (3), the thread yields CPU time and another thread decrements the counter to zero (1) and fires the cond. var (2). This results in the previous thread waiting forever in (3), because it starts waiting on it after it has already been notified.

A hack to make the thing workable is to put a 10 ms sleep just before (2), but that's unacceptable for obvious reasons.

Any suggestions on how to fix this in a performant way?

like image 859
IneQuation Avatar asked Jan 07 '14 21:01

IneQuation


1 Answers

Your diagnose is correct, this code is prone to lose condition notifications in the way you described. I.e. after one thread locked the mutex but before waiting on the condition variable another thread may call notify_all() so that the first thread misses that notification.

A simple fix is to lock the mutex before decrementing the counter and while notifying:

void task_pool::fence_impl(void *arg)
{
    auto f = static_cast<fence*>(arg);
    std::unique_lock<std::mutex> lock(f->resume_mutex);
    if (--f->counter == 0) {
        f->resume.notify_all();
    }
    else do {
        f->resume.wait(lock);
    } while(f->counter);
}

In this case the counter need not be atomic.

An added bonus (or penalty, depending on the point of view) of locking the mutex before notifying is (from here):

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

Regarding the while loop (from here):

Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

like image 111
Maxim Egorushkin Avatar answered Oct 24 '22 09:10

Maxim Egorushkin