I'm trying to write a safe Subject
class from the observer pattern. I want to know if using weak_ptr
is the best way to store IObserver
instances in such a way that:
IObserver
instance after it has been free'd.Subject
class does not hold on to IObserver
references that should be free'd (lapsed listener problem).Subject
class must be thread safe.Unfortunately, our coding standards say that we're not allowed to use boost. I guess I was a bad person in a previous life. Fortunately, I am allowed to use C++11 (what is shipped with Visual Studio 2012).
Here is a sample Observer
class.
// Observer interface that supports notify() method
class IObserver
{
public:
virtual void notify() const = 0;
virtual ~IObserver() {}
};
// Concrete observer implementation that prints a message
class Observer : public IObserver
{
public:
Observer( const std::string& message) : m_message( message ){}
void notify() const {
printf( "%s\r\n", m_message.c_str() );
}
private:
std::string m_message;
};
And here is the Subject
class.
// Subject which registers observers and notifies them as needed.
class Subject
{
public:
// Use shared_ptr to guarantee the observer is valid right now
void registerObserver( const std::shared_ptr<IObserver>& o )
{
std::lock_guard<std::mutex> guard( m_observersMutex );
m_observers.push_back( o );
}
void unregisterObserver( const std::shared_ptr<IObserver>& o )
{
std::lock_guard<std::mutex> guard( m_observersMutex );
// Code to remove the observer from m_observersMutex
}
// This is a method that is run in its own thread that notifies observers of some event
void doNotify()
{
std::lock_guard<std::mutex> guard( m_observersMutex );
// Notify any valid observers of events.
std::for_each( m_observers.cbegin(), m_observers.cend(),
[]( const std::weak_ptr<IObserver>& o )
{
auto observer = o.lock();
if ( observer ) {
observer->notify();
}
} );
// Remove any dead observers. These are ones which have expired().
m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(),
[]( const std::weak_ptr<IObserver>& o )
{
return o.expired();
} ), m_observers.end() );
}
private:
std::vector<std::weak_ptr<IObserver>> m_observers;
std::mutex m_observersMutex;
};
Here's some code that exercises Subject
:
int main(int argc, wchar_t* argv[])
{
Subject subject;
auto observerHello = std::make_shared<Observer>( "Hello world" );
subject.registerObserver( observerHello );
{
// Create a scope to show unregistration.
auto observerBye = std::make_shared<Observer>( "Good bye" );
subject.registerObserver( observerBye );
subject.doNotify();
}
printf( "%s\r\n", "Observer good bye is now be destructed" );
subject.doNotify();
return 0;
}
Is my usage of weak_ptr
thread-safe? From here https://stackoverflow.com/a/2160422/1517648 I think it is.
Is this a legitimate way of solving the lapsed listener problem?
I'd be a bit leery of your doNotify
-- suppose something in an observer you fire ends up adding or removing observers? -- bad things happen (including crashes). Or blocking on the action of another thread, who blocks on trying to add an observer? -- bad things happen (deadlocks!)
This is tricky to solve. Basically, it is a problem with reentrancy.
Never, ever leave control of your code when you hold a lock. Holding a lock while calling a callback is a no-no.
So, at a minimum:
Lock then Copy your list then Unlock. While doing this copy, you can also remove expired observers (from both the original, and the copy, list).
Then fire off observers from the copied list.
This leaves some problems unresolved. Such as the fact that removing an observer does not guarantee that it won't be called in the future! It just means that eventually it won't be called.
How important that is depends on how you use listening.
One approach that might work is a task queue that includes add/remove/notify/killthread events (making killthread a task in the queue makes shutting down far less annoying). Now all synchronization is on the queue. If you aren't up to writing a non-blocking lock-free queue, the notify code can simply lock, std::move
the queue, unlock, then proceeds to execute it. Or you could write a queue such that pop
blocks until there is something to read, and push
doesn't block.
A quick-and-dirty "copy and broadcast" might look like this:
std::vector<std::shared_ptr<IObserver>> targets;
{
std::lock_guard<std::mutex> guard( m_observersMutex );
m_observers.erase( std::remove_if( m_observers.begin(), m_observers.end(),
[&targets]( const std::weak_ptr<IObserver>& o )
{
std::shared_ptr<IObserver> ptr = o.lock();
if (ptr) {
targets.push_back(ptr);
return false;
} else {
return true;
}
} ), m_observers.end() );
}
for( auto& target:targets ) {
target->notify();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With