Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Actor calculation model using boost::thread

I'm trying to implement Actor calculation model over threads on C++ using boost::thread. But program throws weird exception during execution. Exception isn't stable and some times program works in correct way.

There my code:

actor.hpp

class Actor {

  public:
    typedef boost::function<int()> Job;

  private:
    std::queue<Job>             d_jobQueue;
    boost::mutex                d_jobQueueMutex;
    boost::condition_variable   d_hasJob;
    boost::atomic<bool>         d_keepWorkerRunning;
    boost::thread               d_worker;

    void workerThread();

  public:
    Actor();
    virtual ~Actor();

    void execJobAsync(const Job& job);

    int execJobSync(const Job& job);
};

actor.cpp

namespace {

int executeJobSync(std::string          *error,
                   boost::promise<int> *promise,
                   const Actor::Job     *job)
{
    int rc = (*job)();

    promise->set_value(rc);
    return 0;
}

}

void Actor::workerThread()
{
    while (d_keepWorkerRunning) try {
        Job job;
        {
            boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

            while (d_jobQueue.empty()) {
                d_hasJob.wait(g);
            }

            job = d_jobQueue.front();
            d_jobQueue.pop();
        }

        job();
    }
    catch (...) {
        // Log error
    }
}

void Actor::execJobAsync(const Job& job)
{
    boost::mutex::scoped_lock g(d_jobQueueMutex);
    d_jobQueue.push(job);
    d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
        d_hasJob.notify_one();
    }

    int rc = future.get();

    if (rc) {
        ErrorUtil::setLastError(rc, error.c_str());
    }

    return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
    d_keepWorkerRunning = false;
    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_hasJob.notify_one();
    }
    d_worker.join();
}

Actually exception that is thrown is boost::thread_interrupted in int rc = future.get(); line. But form boost docs I can't reason of this exception. Docs says

Throws: - boost::thread_interrupted if the result associated with *this is not ready at the point of the call, and the current thread is interrupted.

But my worker thread can't be in interrupted state.

When I used gdb and set "catch throw" I see that back trace looks like

throw thread_interrupted

boost::detail::interruption_checker::check_for_interruption

boost::detail::interruption_checker::interruption_checker

boost::condition_variable::wait

boost::detail::future_object_base::wait_internal

boost::detail::future_object_base::wait

boost::detail::future_object::get

boost::unique_future::get

I looked into boost sources but can't get why interruption_checker decided that worker thread is interrupted.

So someone C++ guru, please help me. What I need to do to get correct code? I'm using:

boost 1_53

Linux version 2.6.18-194.32.1.el5 Red Hat 4.1.2-48

gcc 4.7

EDIT

Fixed it! Thanks to Evgeny Panasyuk and Lazin. The problem was in TLS management. boost::thread and boost::thread_specific_ptr are using same TLS storage for their purposes. In my case there was problem when they both tried to change this storage on creation (Unfortunately I didn't get why in details it happens). So TLS became corrupted.

I replaced boost::thread_specific_ptr from my code with __thread specified variable.

Offtop: During debugging I found memory corruption in external library and fixed it =)

.

EDIT 2 I got the exact problem... It is a bug in GCC =) The _GLIBCXX_DEBUG compilation flag breaks ABI. You can see discussion on boost bugtracker: https://svn.boost.org/trac/boost/ticket/7666

like image 778
inkooboo Avatar asked Oct 25 '13 15:10

inkooboo


2 Answers

I have found several bugs:


Actor::workerThread function does double unlock on d_jobQueueMutex. First unlock is manual d_jobQueueMutex.unlock();, second is in destructor of boost::unique_lock<boost::mutex>.

You should prevent one of unlocking, for example release association between unique_lock and mutex:

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

Or add additional code block + default-constructed Job.


It is possible that workerThread will never leave following loop:

while (d_jobQueue.empty()) {
    d_hasJob.wait(g);
}

Imagine following case: d_jobQueue is empty, Actor::~Actor() is called, it sets flag and notifies worker thread:

d_keepWorkerRunning = false;
d_hasJob.notify_one();

workerThread wakes up in while loop, sees that queue is empty and sleeps again.

It is common practice to send special final job to stop worker thread:

~Actor()
{
    execJobSync([this]()->int
    {
        d_keepWorkerRunning = false;
        return 0;
    });
    d_worker.join();
}

In this case, d_keepWorkerRunning is not required to be atomic.


LIVE DEMO on Coliru


EDIT:

I have added event queue code into your example.

You have concurrent queue in both EventQueueImpl and Actor, but for different types. It is possible to extract common part into separate entity concurrent_queue<T> which works for any type. It would be much easier to debug and test queue in one place than catching bugs scattered over different classes.

So, you can try to use this concurrent_queue<T>(on Coliru)

like image 53
Evgeny Panasyuk Avatar answered Nov 15 '22 09:11

Evgeny Panasyuk


This is just a guess. I think that some code can actually call boost::tread::interrupt(). You can set breakpoint to this function and see what code is responsible for this. You can test for interruption in execJobSync:

int Actor::execJobSync(const Job& job)
{
    if (boost::this_thread::interruption_requested())
        std::cout << "Interruption requested!" << std::endl;
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

The most suspicious code in this case is a code that has reference to thread object.

It is good practice to make your boost::thread code interruption aware anyway. It is also possible to disable interruption for some scope.

If this is not the case - you need to check code that works with thread local storage, because thread interruption flag stored in the TLS. Maybe some your code rewrites it. You can check interruption before and after such code fragment.

Another possibility is that your memory is corrupt. If no code is calling boost::thread::interrupt() and you doesn't work with TLS. This is the most hard case, try to use some dynamic analyzer - valgrind or clang memory sanitizer.

Offtopic: You probably need to use some concurrent queue. std::queue will be very slow because of high memory contention and you will end up with poor cache performance. Good concurrent queue allow your code to enqueue and dequeue elements in parallel.

Also, actor is not something that supposed to execute arbitrary code. Actor queue must receive simple messages, not functions! Youre writing a job queue :) You need to take a look at some actor system like Akka or libcpa.

like image 34
Evgeny Lazin Avatar answered Nov 15 '22 08:11

Evgeny Lazin