Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I tear down observer relationship in multithreaded C++?

I have a Subject which offers Subscribe(Observer*) and Unsubscribe(Observer*) to clients. Subject runs in its own thread (from which it calls Notify() on subscribed Observers) and a mutex protects its internal list of Observers.

I would like client code - which I don't control - to be able to safely delete an Observer after it is unsubscribed. How can this be achieved?

  • Holding the mutex - even a recursive mutex - while I notify observers isn't an option because of the deadlock risk.
  • I could mark an observer for removal in the Unsubscribe call and remove it from the Subject thread. Then clients could wait for a special 'Safe to delete' notification. This looks safe, but is onerous for clients.

Edit

Some illustrative code follows. The problem is how to prevent Unsubscribe happening while Run is at the 'Problem here' comment. Then I could call back on a deleted object. Alternatively, if I hold the mutex throughout rather than making the copy, I can deadlock certain clients.

#include <set>
#include <functional>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

using namespace std;
using namespace boost;

class Observer
{
public:
    void Notify() {}
};

class Subject
{
public:
    Subject() : t(bind(&Subject::Run, this))
    {
    }

    void Subscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.insert(o);
    }

    void Unsubscribe(Observer* o)
    {
        mutex::scoped_lock l(m);
        observers.erase(o);
    }

    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            set<Observer*> notifyList;
            {
                mutex::scoped_lock l(m);
                notifyList = observers;
            }
            // Problem here
            for_each(notifyList.begin(), notifyList.end(), 
                     mem_fun(&Observer::Notify));
        }
    }

private:
    set<Observer*> observers;
    thread t;
    mutex m;
};

Edit

I can't Notify observers while holding the mutex because of the deadlock risk. The most obvious way this can happen - the client calls Subscribe or Unsubscribe from inside Notify - is easily remedied by making the mutex recursive. More insidious is the risk of intermittent deadlock on different threads.

I'm in a multithreaded environment, so at any point in a thread's execution, it will typically hold a sequence of locks L1, L2, ... Ln. Another thread will hold locks K1, K2, ... Km. A properly written client will ensure that different threads will always acquire locks in the same order. But when clients interact with my Subject's mutex - call it X - this strategy will be broken: Calls to Subscribe / Unsubscribe acquire locks in the order L1, L2, ... Ln, X. Calls to Notify from my Subject thread acquire locks in the order X, K1, K2, ... Km. If any of the Li or Kj can coincide down any call path, the client suffers an intermittent deadlock, with little prospect of debugging it. Since I don't control the client code, I can't do this.

like image 836
fizzer Avatar asked Feb 10 '09 20:02

fizzer


3 Answers

Unsubscribe() should be synchronous, so that it does not return until Observer is guaranteed not to be in Subject's list anymore. That's the only way to do it safely.

ETA (moving my comment to the answer):

Since time doesn't seem to be an issue, take and release the mutex between notifying each observer. You won't be able to use for_each the way you are now, and you'll have to check the iterator to ensure that it's still valid.

for ( ... )
{
    take mutex
    check iterator validity
    notify
    release mutex
}

That will do what you want.

like image 129
Rob K Avatar answered Sep 20 '22 23:09

Rob K


Can you change the signature of Subscribe() an Unsubscribe()? Replacing the Observer* with something like shared_ptr<Observer> would make things easier.

EDIT: Replaced "easy" by "easier" above. For an example of how this is difficult to "get right", see the history of the Boost.Signals and of the adopted-but-not-yet-in-the-distribution Boost.Signals2 (formerly Boost.ThreadSafeSignals) libraries.

like image 27
Éric Malenfant Avatar answered Sep 19 '22 23:09

Éric Malenfant


The "ideal" solution would involve using shared_ptr and weak_ptr. However, in order to be generic, it also has to account for the issue of Subject being dropped before some of its Observer (yes, that can happen too).

class Subject {
public:
    void Subscribe(std::weak_ptr<Observer> o);
    void Unsubscribe(std::weak_ptr<Observer> o);

private:
    std::mutex mutex;
    std::set< std::weak_ptr<Observer> > observers;
};

class Observer: boost::noncopyable {
public:
    ~Observer();

    void Notify();

private:
    std::mutex;
    std::weak_ptr<Subject> subject;
};

With this structure, we create a cyclic graph, but with a judicious use of weak_ptr so that both Observer and Subject can be destroyed without coordination.

Note: I have assumed, for simplicity, that an Observer observes a single Subject at a time, but it could easily observe multiple subjects.


Now, it seems that you are stuck with unsafe memory management. This is a quite difficult situation, as you can imagine. In this case, I would suggest an experiment: an asynchronous Unsubscribe. Or at least, the call to Unsubscribe will be synchronous from the outside, but be implemented asynchronously.

The idea is simple: we will use the event queue to achieve synchronization. That is:

  • the call to Unsubscribe posts an event in the queue (payload Observer*) and then waits
  • when the Subject thread has processed the Unsubscribe event(s), it wakes up the waiting thread(s)

You can use either busy-waiting or a condition variable, I would advise a condition variable unless performance dictates otherwise.

Note: this solution completely fails to account for Subject dying prematurely.

like image 43
Matthieu M. Avatar answered Sep 21 '22 23:09

Matthieu M.