I have a Subject which offers Subscribe(Observer*)
and Unsubscribe(Observer*)
to clients. Subject runs in its own thread (from which it calls Notify()
on subscribed Observers) and a mutex protects its internal list of Observers.
I would like client code - which I don't control - to be able to safely delete an Observer after it is unsubscribed. How can this be achieved?
Edit
Some illustrative code follows. The problem is how to prevent Unsubscribe happening while Run is at the 'Problem here' comment. Then I could call back on a deleted object. Alternatively, if I hold the mutex throughout rather than making the copy, I can deadlock certain clients.
#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
using namespace std;
using namespace boost;
class Observer
{
public:
void Notify() {}
};
class Subject
{
public:
Subject() : t(bind(&Subject::Run, this))
{
}
void Subscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.insert(o);
}
void Unsubscribe(Observer* o)
{
mutex::scoped_lock l(m);
observers.erase(o);
}
void Run()
{
for (;;)
{
WaitForSomethingInterestingToHappen();
set<Observer*> notifyList;
{
mutex::scoped_lock l(m);
notifyList = observers;
}
// Problem here
for_each(notifyList.begin(), notifyList.end(),
mem_fun(&Observer::Notify));
}
}
private:
set<Observer*> observers;
thread t;
mutex m;
};
Edit
I can't Notify observers while holding the mutex because of the deadlock risk. The most obvious way this can happen - the client calls Subscribe or Unsubscribe from inside Notify - is easily remedied by making the mutex recursive. More insidious is the risk of intermittent deadlock on different threads.
I'm in a multithreaded environment, so at any point in a thread's execution, it will typically hold a sequence of locks L1, L2, ... Ln. Another thread will hold locks K1, K2, ... Km. A properly written client will ensure that different threads will always acquire locks in the same order. But when clients interact with my Subject's mutex - call it X - this strategy will be broken: Calls to Subscribe / Unsubscribe acquire locks in the order L1, L2, ... Ln, X. Calls to Notify from my Subject thread acquire locks in the order X, K1, K2, ... Km. If any of the Li or Kj can coincide down any call path, the client suffers an intermittent deadlock, with little prospect of debugging it. Since I don't control the client code, I can't do this.
Unsubscribe() should be synchronous, so that it does not return until Observer is guaranteed not to be in Subject's list anymore. That's the only way to do it safely.
ETA (moving my comment to the answer):
Since time doesn't seem to be an issue, take and release the mutex between notifying each observer. You won't be able to use for_each the way you are now, and you'll have to check the iterator to ensure that it's still valid.
for ( ... )
{
take mutex
check iterator validity
notify
release mutex
}
That will do what you want.
Can you change the signature of Subscribe() an Unsubscribe()? Replacing the Observer* with something like shared_ptr<Observer> would make things easier.
EDIT: Replaced "easy" by "easier" above. For an example of how this is difficult to "get right", see the history of the Boost.Signals and of the adopted-but-not-yet-in-the-distribution Boost.Signals2 (formerly Boost.ThreadSafeSignals) libraries.
The "ideal" solution would involve using shared_ptr
and weak_ptr
. However, in order to be generic, it also has to account for the issue of Subject
being dropped before some of its Observer
(yes, that can happen too).
class Subject {
public:
void Subscribe(std::weak_ptr<Observer> o);
void Unsubscribe(std::weak_ptr<Observer> o);
private:
std::mutex mutex;
std::set< std::weak_ptr<Observer> > observers;
};
class Observer: boost::noncopyable {
public:
~Observer();
void Notify();
private:
std::mutex;
std::weak_ptr<Subject> subject;
};
With this structure, we create a cyclic graph, but with a judicious use of weak_ptr
so that both Observer
and Subject
can be destroyed without coordination.
Note: I have assumed, for simplicity, that an Observer
observes a single Subject
at a time, but it could easily observe multiple subjects.
Now, it seems that you are stuck with unsafe memory management. This is a quite difficult situation, as you can imagine. In this case, I would suggest an experiment: an asynchronous Unsubscribe
. Or at least, the call to Unsubscribe
will be synchronous from the outside, but be implemented asynchronously.
The idea is simple: we will use the event queue to achieve synchronization. That is:
Unsubscribe
posts an event in the queue (payload Observer*
) and then waitsSubject
thread has processed the Unsubscribe
event(s), it wakes up the waiting thread(s)You can use either busy-waiting or a condition variable, I would advise a condition variable unless performance dictates otherwise.
Note: this solution completely fails to account for Subject
dying prematurely.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With