In this question I described boost::asio and boost::coroutine usage pattern which causes random crashes of my application and I published extract from my code and valgrind and GDB output.
In order to investigate the problem further I created smaller proof of concept application which applies the same pattern. I saw that the same problem arises in the smaller program which source I publish here.
The code starts a few threads and creates a connection pool with a few dummy connections (user supplied numbers). Additional arguments are unsigned integer numbers which plays the role of fake requests. The dummy implementation of sendRequest
function just starts asynchronous timer for waiting number of seconds equal to the input number and yileds from the function.
Can someone see the problem with this code and can he propose some fix for it?
#include "asiocoroutineutils.h"
#include "concurrentqueue.h"
#include <iostream>
#include <thread>
#include <boost/lexical_cast.hpp>
using namespace std;
using namespace boost;
using namespace utils;
#define id this_thread::get_id() << ": "
// ---------------------------------------------------------------------------
/*!
* \brief This is a fake Connection class
*/
class Connection
{
public:
Connection(unsigned connectionId)
: _id(connectionId)
{
}
unsigned getId() const
{
return _id;
}
void sendRequest(asio::io_service& ioService,
unsigned seconds,
AsioCoroutineJoinerProxy,
asio::yield_context yield)
{
cout << id << "Connection " << getId()
<< " Start sending: " << seconds << endl;
// waiting on this timer is palceholder for any asynchronous operation
asio::steady_timer timer(ioService);
timer.expires_from_now(chrono::seconds(seconds));
coroutineAsyncWait(timer, yield);
cout << id << "Connection " << getId()
<< " Received response: " << seconds << endl;
}
private:
unsigned _id;
};
typedef std::unique_ptr<Connection> ConnectionPtr;
typedef std::shared_ptr<asio::steady_timer> TimerPtr;
// ---------------------------------------------------------------------------
class ConnectionPool
{
public:
ConnectionPool(size_t connectionsCount)
{
for(size_t i = 0; i < connectionsCount; ++i)
{
cout << "Creating connection: " << i << endl;
_connections.emplace_back(new Connection(i));
}
}
ConnectionPtr getConnection(TimerPtr timer,
asio::yield_context& yield)
{
lock_guard<mutex> lock(_mutex);
while(_connections.empty())
{
cout << id << "There is no free connection." << endl;
_timers.emplace_back(timer);
timer->expires_from_now(
asio::steady_timer::clock_type::duration::max());
_mutex.unlock();
coroutineAsyncWait(*timer, yield);
_mutex.lock();
cout << id << "Connection was freed." << endl;
}
cout << id << "Getting connection: "
<< _connections.front()->getId() << endl;
ConnectionPtr connection = std::move(_connections.front());
_connections.pop_front();
return connection;
}
void addConnection(ConnectionPtr connection)
{
lock_guard<mutex> lock(_mutex);
cout << id << "Returning connection " << connection->getId()
<< " to the pool." << endl;
_connections.emplace_back(std::move(connection));
if(_timers.empty())
return;
auto timer = _timers.back();
_timers.pop_back();
auto& ioService = timer->get_io_service();
ioService.post([timer]()
{
cout << id << "Wake up waiting getConnection." << endl;
timer->cancel();
});
}
private:
mutex _mutex;
deque<ConnectionPtr> _connections;
deque<TimerPtr> _timers;
};
typedef unique_ptr<ConnectionPool> ConnectionPoolPtr;
// ---------------------------------------------------------------------------
class ScopedConnection
{
public:
ScopedConnection(ConnectionPool& pool,
asio::io_service& ioService,
asio::yield_context& yield)
: _pool(pool)
{
auto timer = make_shared<asio::steady_timer>(ioService);
_connection = _pool.getConnection(timer, yield);
}
Connection& get()
{
return *_connection;
}
~ScopedConnection()
{
_pool.addConnection(std::move(_connection));
}
private:
ConnectionPool& _pool;
ConnectionPtr _connection;
};
// ---------------------------------------------------------------------------
void sendRequest(asio::io_service& ioService,
ConnectionPool& pool,
unsigned seconds,
asio::yield_context yield)
{
cout << id << "Constructing request ..." << endl;
AsioCoroutineJoiner joiner(ioService);
ScopedConnection connection(pool, ioService, yield);
asio::spawn(ioService, bind(&Connection::sendRequest,
connection.get(),
std::ref(ioService),
seconds,
AsioCoroutineJoinerProxy(joiner),
placeholders::_1));
joiner.join(yield);
cout << id << "Processing response ..." << endl;
}
// ---------------------------------------------------------------------------
void threadFunc(ConnectionPool& pool,
ConcurrentQueue<unsigned>& requests)
{
try
{
asio::io_service ioService;
while(true)
{
unsigned request;
if(!requests.tryPop(request))
break;
cout << id << "Scheduling request: " << request << endl;
asio::spawn(ioService, bind(sendRequest,
std::ref(ioService),
std::ref(pool),
request,
placeholders::_1));
}
ioService.run();
}
catch(const std::exception& e)
{
cerr << id << "Error: " << e.what() << endl;
}
}
// ---------------------------------------------------------------------------
int main(int argc, char* argv[])
{
if(argc < 3)
{
cout << "Usage: ./async_request poolSize threadsCount r0 r1 ..."
<< endl;
return -1;
}
try
{
auto poolSize = lexical_cast<size_t>(argv[1]);
auto threadsCount = lexical_cast<size_t>(argv[2]);
ConcurrentQueue<unsigned> requests;
for(int i = 3; i < argc; ++i)
{
auto request = lexical_cast<unsigned>(argv[i]);
requests.tryPush(request);
}
ConnectionPoolPtr pool(new ConnectionPool(poolSize));
vector<unique_ptr<thread>> threads;
for(size_t i = 0; i < threadsCount; ++i)
{
threads.emplace_back(
new thread(threadFunc, std::ref(*pool), std::ref(requests)));
}
for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
}
catch(const std::exception& e)
{
cerr << "Error: " << e.what() << endl;
}
return 0;
}
Here are some helper utilities used by the above code:
#pragma once
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>
namespace utils
{
inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
boost::asio::yield_context& yield)
{
boost::system::error_code ec;
timer.async_wait(yield[ec]);
if(ec && ec != boost::asio::error::operation_aborted)
throw std::runtime_error(ec.message());
}
class AsioCoroutineJoiner
{
public:
explicit AsioCoroutineJoiner(boost::asio::io_service& io)
: _timer(io), _count(0) {}
void join(boost::asio::yield_context yield)
{
assert(_count > 0);
_timer.expires_from_now(
boost::asio::steady_timer::clock_type::duration::max());
coroutineAsyncWait(_timer, yield);
}
void inc()
{
++_count;
}
void dec()
{
assert(_count > 0);
--_count;
if(0 == _count)
_timer.cancel();
}
private:
boost::asio::steady_timer _timer;
std::size_t _count;
}; // AsioCoroutineJoiner class
class AsioCoroutineJoinerProxy
{
public:
AsioCoroutineJoinerProxy(AsioCoroutineJoiner& joiner)
: _joiner(joiner)
{
_joiner.inc();
}
AsioCoroutineJoinerProxy(const AsioCoroutineJoinerProxy& joinerProxy)
: _joiner(joinerProxy._joiner)
{
_joiner.inc();
}
~AsioCoroutineJoinerProxy()
{
_joiner.dec();
}
private:
AsioCoroutineJoiner& _joiner;
}; // AsioCoroutineJoinerProxy class
} // utils namespace
For completeness of the code the last missing part is ConcurrentQueue class. It is too long to paste it here, but if you want you can find it here.
Example usage of the application is:
./connectionpooltest 3 3 5 7 8 1 0 9 2 4 3 6
where the first number 3 are fake connections count and the second number 3 are the number of used threads. Numbers after them are fake requests.
The output of valgrind and GDB is the same as in the mentioned above question.
Used version of boost is 1.57. The compiler is GCC 4.8.3. The operating system is CentOS Linux release 7.1.1503
It seems that all valgrind errors are caused because of BOOST_USE_VALGRIND macro is not defined as Tanner Sansbury points in comment related to this question. It seems that except this the program is correct.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With