I'm trying to implement Actor calculation model over threads on C++ using boost::thread. But program throws weird exception during execution. Exception isn't stable and some times program works in correct way.
There my code:
actor.hpp
class Actor {
public:
typedef boost::function<int()> Job;
private:
std::queue<Job> d_jobQueue;
boost::mutex d_jobQueueMutex;
boost::condition_variable d_hasJob;
boost::atomic<bool> d_keepWorkerRunning;
boost::thread d_worker;
void workerThread();
public:
Actor();
virtual ~Actor();
void execJobAsync(const Job& job);
int execJobSync(const Job& job);
};
actor.cpp
namespace {
int executeJobSync(std::string *error,
boost::promise<int> *promise,
const Actor::Job *job)
{
int rc = (*job)();
promise->set_value(rc);
return 0;
}
}
void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
job = d_jobQueue.front();
d_jobQueue.pop();
}
job();
}
catch (...) {
// Log error
}
}
void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}
int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}
int rc = future.get();
if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}
return rc;
}
Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}
Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}
Actually exception that is thrown is boost::thread_interrupted in int rc = future.get();
line. But form boost docs I can't reason of this exception. Docs says
Throws: - boost::thread_interrupted if the result associated with *this is not ready at the point of the call, and the current thread is interrupted.
But my worker thread can't be in interrupted state.
When I used gdb and set "catch throw" I see that back trace looks like
throw thread_interrupted
boost::detail::interruption_checker::check_for_interruption
boost::detail::interruption_checker::interruption_checker
boost::condition_variable::wait
boost::detail::future_object_base::wait_internal
boost::detail::future_object_base::wait
boost::detail::future_object::get
boost::unique_future::get
I looked into boost sources but can't get why interruption_checker decided that worker thread is interrupted.
So someone C++ guru, please help me. What I need to do to get correct code? I'm using:
boost 1_53
Linux version 2.6.18-194.32.1.el5 Red Hat 4.1.2-48
gcc 4.7
EDIT
Fixed it! Thanks to Evgeny Panasyuk and Lazin. The problem was in TLS management. boost::thread and boost::thread_specific_ptr are using same TLS storage for their purposes. In my case there was problem when they both tried to change this storage on creation (Unfortunately I didn't get why in details it happens). So TLS became corrupted.
I replaced boost::thread_specific_ptr from my code with __thread specified variable.
Offtop: During debugging I found memory corruption in external library and fixed it =)
.
EDIT 2 I got the exact problem... It is a bug in GCC =) The _GLIBCXX_DEBUG compilation flag breaks ABI. You can see discussion on boost bugtracker: https://svn.boost.org/trac/boost/ticket/7666
I have found several bugs:
Actor::workerThread
function does double unlock on d_jobQueueMutex
. First unlock is manual d_jobQueueMutex.unlock();
, second is in destructor of boost::unique_lock<boost::mutex>
.
You should prevent one of unlocking, for example release association between unique_lock
and mutex
:
g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();
Or add additional code block + default-constructed Job
.
It is possible that workerThread
will never leave following loop:
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
Imagine following case: d_jobQueue
is empty, Actor::~Actor()
is called, it sets flag and notifies worker thread:
d_keepWorkerRunning = false;
d_hasJob.notify_one();
workerThread
wakes up in while loop, sees that queue is empty and sleeps again.
It is common practice to send special final job to stop worker thread:
~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}
In this case, d_keepWorkerRunning
is not required to be atomic.
LIVE DEMO on Coliru
EDIT:
I have added event queue code into your example.
You have concurrent queue in both EventQueueImpl
and Actor
, but for different types. It is possible to extract common part into separate entity concurrent_queue<T>
which works for any type. It would be much easier to debug and test queue in one place than catching bugs scattered over different classes.
So, you can try to use this concurrent_queue<T>
(on Coliru)
This is just a guess. I think that some code can actually call boost::tread::interrupt(). You can set breakpoint to this function and see what code is responsible for this. You can test for interruption in execJobSync
:
int Actor::execJobSync(const Job& job)
{
if (boost::this_thread::interruption_requested())
std::cout << "Interruption requested!" << std::endl;
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
The most suspicious code in this case is a code that has reference to thread object.
It is good practice to make your boost::thread code interruption aware anyway. It is also possible to disable interruption for some scope.
If this is not the case - you need to check code that works with thread local storage, because thread interruption flag stored in the TLS. Maybe some your code rewrites it. You can check interruption before and after such code fragment.
Another possibility is that your memory is corrupt. If no code is calling boost::thread::interrupt() and you doesn't work with TLS. This is the most hard case, try to use some dynamic analyzer - valgrind or clang memory sanitizer.
Offtopic: You probably need to use some concurrent queue. std::queue will be very slow because of high memory contention and you will end up with poor cache performance. Good concurrent queue allow your code to enqueue and dequeue elements in parallel.
Also, actor is not something that supposed to execute arbitrary code. Actor queue must receive simple messages, not functions! Youre writing a job queue :) You need to take a look at some actor system like Akka or libcpa.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With