Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost condition variable issue

The following minimal code sample of a larger program sends commands from client threads to an asio io_service object. The io_service object (in the Ios class) is being run with one thread. When the command is sent the client thread waits until it is notified by the Ios object (via Cmd::NotifyFinish()) that it is completed.

This sample seems to run on Linux Ubuntu 11.04 with boost 1.46 fine but on Windows 7 boost 1.46 it asserts.

I suspect it is something to do with the lock in Cmd::NotifyFinish(). When I move the lock out of the nested scope so that when waitConditionVariable_.notify_one() is called in the lock's scope it doesn't crash on Windows 7. However, the boost::thread documentation states that notify_one() doesn't need to be called within the lock.

The stack trace (below) shows it is asserting when notify_one() is called. It is as though the cmd object has disappeared before notify is called...

How do I make this thread safe and not assert?

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/bind.hpp>
#include <iostream>

class Cmd
{
public:
    Cmd() :   cnt_(0), waitPred_(false), waiting_(false)
    {
    }
    virtual ~Cmd()
    {
    }
    void BindInfo(int CmdSeq)
    {
        cnt_ = CmdSeq;
    }
    void NotifyFinish()
    {
        // call by service thread...
        {
            boost::mutex::scoped_lock lock(waitMutex_);
            waitPred_ = true;
            if (!waiting_)
            {
                // don't need to notify as isn't waiting
                return;
            }
        }
        waitConditionVariable_.notify_one();
    }
    void Wait()
    {
        // called by worker threads
        boost::mutex::scoped_lock lock(waitMutex_);
        waiting_ = true;
        while (!waitPred_)
            waitConditionVariable_.wait(lock);
    }
    int cnt_;
private:

    boost::mutex waitMutex_;
    boost::condition_variable waitConditionVariable_;
    bool waitPred_;
    bool waiting_;
};


class Ios
{
public:
    Ios() : timer_(ios_), cnt_(0), thread_(boost::bind(&Ios::Start, this))
    {
    }
    void Start()
    {
        timer_.expires_from_now(boost::posix_time::seconds(5));
        timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        ios_.run();
    }
    void RunCmd(Cmd& C)
    {
        ios_.post(boost::bind(&Ios::RunCmdAsyn, this, boost::ref(C)));
    }

private:
    void RunCmdAsyn(Cmd& C)
    {
        C.BindInfo(cnt_++);
        C.NotifyFinish();
    }
    void TimerHandler(const boost::system::error_code& Ec)
    {
        if (!Ec)
        {
            std::cout << cnt_ << "\n";
            timer_.expires_from_now(boost::posix_time::seconds(5));
            timer_.async_wait(boost::bind(&Ios::TimerHandler, this, _1));
        }
        else
            exit(0);
    }

    boost::asio::io_service ios_;
    boost::asio::deadline_timer timer_;
    int cnt_;
    boost::thread thread_;
};

static Ios ios;

void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

int main()
{
    std::cout << "Starting\n";
    boost::thread_group threads;
    const int num = 5;

    for (int i = 0; i < num; i++)
    {
        // Worker threads
        threads.create_thread(ThreadFn);
    }
    threads.join_all();

}

stack trace

msvcp100d.dll!std::_Debug_message(const wchar_t * message, const wchar_t * file, unsigned int line)  Line 15    C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::_Compat(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 238 + 0x17 bytes   C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator==(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 203 C++
iosthread.exe!std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >::operator!=(const std::_Vector_const_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > & _Right)  Line 208 + 0xc bytes C++
iosthread.exe!std::_Debug_range2<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line, std::random_access_iterator_tag __formal)  Line 715 + 0xc bytes  C++
iosthread.exe!std::_Debug_range<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > >(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, const wchar_t * _File, unsigned int _Line)  Line 728 + 0x6c bytes    C++
iosthread.exe!std::find_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 92 + 0x54 bytes    C++
iosthread.exe!std::remove_if<std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > >,bool (__cdecl*)(boost::intrusive_ptr<boost::detail::basic_cv_list_entry> const &)>(std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _First, std::_Vector_iterator<std::_Vector_val<boost::intrusive_ptr<boost::detail::basic_cv_list_entry>,std::allocator<boost::intrusive_ptr<boost::detail::basic_cv_list_entry> > > > _Last, bool (const boost::intrusive_ptr<boost::detail::basic_cv_list_entry> &)* _Pred)  Line 1848 + 0x58 bytes    C++
iosthread.exe!boost::detail::basic_condition_variable::notify_one()  Line 267 + 0xb4 bytes  C++
iosthread.exe!Cmd::NotifyFinish()  Line 41  C++
like image 763
Liam Avatar asked Jul 11 '11 00:07

Liam


2 Answers

The problem is that the condition variable is a member of the Cmd object that is created by the client thread and is destroyed by that client thread when the wait is completed.

So you have a race condition where:

  • boost::condition_variable::notify_one() is called on the 'service thread'
  • that unblocks the client thread that's waiting on that condition variable
  • the client thread can then destroy the condition variable that the service thread is still working with in its call to notify_one.

So your observation that it's "as though the cmd object has disappeared before notify is called" is pretty much exactly what's happened, I think. Except that the Cmd object didn't disappear before notify_one() is called, it disappeared while notify_one() was doing its work. Your other note that "the boost::thread documentation states that notify_one() doesn't need to be called within the lock" is true, but that doesn't mean that the condition variable can be destroyed before notify_one() has returned.

You need to manage the lifetime of the Cmd object so that the service thread is done using it before it gets destroyed - holding the mutex that's in the Cmd object while notify_one() is called is one way to do that (as you've noticed). Or you can pull the condition variable out of the Cmd object so that its lifetime is independent of the Cmd object (maybe shared_ptr<> can help with that).

Also, note that I believe that the waiting_ member of the Cmd class is superfluous - you can call notify_one() or notify_all() when there are no waiters on a condition variable - it's already doing the checking for that for you (I don't think it's hurting anything, just that it's complexity that doesn't need to be in the Cmd class).

like image 197
Michael Burr Avatar answered Nov 01 '22 21:11

Michael Burr


void ThreadFn()
{
    while (1)
    {
        Cmd c;
        ios.RunCmd(c);
         c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}

Since this loop is infinite why not just put Cmd c; outside the scope of the while(1) so it reuses 'c' each iteration of while(1) ?

void ThreadFn()
{
    Cmd c;

    while (1)
    {   
        ios.RunCmd(c);
        c.Wait();
        //std::cout << c.cnt_ << "\n";
    }
}
like image 1
Nathan Avatar answered Nov 01 '22 20:11

Nathan