Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observer pattern using weak_ptr

I'm trying to write a safe Subject class from the observer pattern. I want to know if using weak_ptr is the best way to store IObserver instances in such a way that:

  • It is not possible to use an IObserver instance after it has been free'd.
  • The Subject class does not hold on to IObserver references that should be free'd (lapsed listener problem).
  • The Subject class must be thread safe.

Unfortunately, our coding standards say that we're not allowed to use boost. I guess I was a bad person in a previous life. Fortunately, I am allowed to use C++11 (what is shipped with Visual Studio 2012).

Here is a sample Observer class.

// Observer interface that supports notify() method
class IObserver
{
public:
    virtual void notify() const = 0;
    virtual ~IObserver() {}
};

// Concrete observer implementation that prints a message
class Observer : public IObserver
{
public:
    Observer( const std::string& message) : m_message( message ){}

    void notify() const {
        printf( "%s\r\n", m_message.c_str() );
    }

private:
    std::string m_message;
};

And here is the Subject class.

// Subject which registers observers and notifies them as needed.
class Subject
{
public:
    // Use shared_ptr to guarantee the observer is valid right now
    void registerObserver( const std::shared_ptr<IObserver>& o )
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        m_observers.push_back( o );
    }

    void unregisterObserver( const std::shared_ptr<IObserver>& o )
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        // Code to remove the observer from m_observersMutex
    }

    // This is a method that is run in its own thread that notifies observers of some event
    void doNotify()
    {
        std::lock_guard<std::mutex> guard( m_observersMutex );
        // Notify any valid observers of events.
        std::for_each( m_observers.cbegin(), m_observers.cend(), 
            []( const std::weak_ptr<IObserver>& o )
        {
            auto observer = o.lock();
            if ( observer ) {
                observer->notify();
            } 
        } );

        // Remove any dead observers.  These are ones which have expired().
        m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(), 
            []( const std::weak_ptr<IObserver>& o )
        {
            return o.expired();
        } ), m_observers.end() );

    }


private:
    std::vector<std::weak_ptr<IObserver>> m_observers;
    std::mutex m_observersMutex;
};

Here's some code that exercises Subject:

int main(int argc, wchar_t* argv[])
{

    Subject subject;
    auto observerHello = std::make_shared<Observer>( "Hello world" );
    subject.registerObserver( observerHello );
    {
        // Create a scope to show unregistration.
        auto observerBye = std::make_shared<Observer>( "Good bye" );
        subject.registerObserver( observerBye );

        subject.doNotify();
    }
    printf( "%s\r\n", "Observer good bye is now be destructed" );
    subject.doNotify();
    return 0;
}

Is my usage of weak_ptr thread-safe? From here https://stackoverflow.com/a/2160422/1517648 I think it is.

Is this a legitimate way of solving the lapsed listener problem?

like image 446
Steve Avatar asked Jan 17 '13 14:01

Steve


1 Answers

I'd be a bit leery of your doNotify -- suppose something in an observer you fire ends up adding or removing observers? -- bad things happen (including crashes). Or blocking on the action of another thread, who blocks on trying to add an observer? -- bad things happen (deadlocks!)

This is tricky to solve. Basically, it is a problem with reentrancy.

Never, ever leave control of your code when you hold a lock. Holding a lock while calling a callback is a no-no.

So, at a minimum:

Lock then Copy your list then Unlock. While doing this copy, you can also remove expired observers (from both the original, and the copy, list).

Then fire off observers from the copied list.

This leaves some problems unresolved. Such as the fact that removing an observer does not guarantee that it won't be called in the future! It just means that eventually it won't be called.

How important that is depends on how you use listening.

One approach that might work is a task queue that includes add/remove/notify/killthread events (making killthread a task in the queue makes shutting down far less annoying). Now all synchronization is on the queue. If you aren't up to writing a non-blocking lock-free queue, the notify code can simply lock, std::move the queue, unlock, then proceeds to execute it. Or you could write a queue such that pop blocks until there is something to read, and push doesn't block.

A quick-and-dirty "copy and broadcast" might look like this:

std::vector<std::shared_ptr<IObserver>> targets;
{
  std::lock_guard<std::mutex> guard( m_observersMutex );
  m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(), 
        [&targets]( const std::weak_ptr<IObserver>& o )
    {
      std::shared_ptr<IObserver> ptr = o.lock();
      if (ptr) {
        targets.push_back(ptr);
        return false;
      } else {
        return true;
      }
    } ), m_observers.end() );
}

for( auto& target:targets ) {
  target->notify();
}
like image 80
Yakk - Adam Nevraumont Avatar answered Sep 18 '22 19:09

Yakk - Adam Nevraumont