Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost:asio thread pool implementation for occasionally synchronized tasks

I have a "main" function that performs many small, independent tasks each once per time step. However, after each time step, I must wait for all of the tasks to complete before stepping forward.

I want to make the program multithreaded. I have tried implementations with the boost-offshoot threadpool, and I've tried using a vector of (shared pointers to) threads, and I've tried the asio threadpool ideas (using an io_service, establishing some work, then distributing run to the threads and posting handlers to the io_service).

All of these seem to have a lot of overhead creating and destroying threads for my "many small tasks," and I want a way, preferably using the asio tools, to instantiate one io_service, one thread_group, posting handlers to the io_service, and waiting for a single time step's work to be finished before posting more tasks. Is there a good way to do this? Here's (stripped down) code for what I have working now:

boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
    io_service.reset();
    boost::thread_group threads;
    // scoping to destroy the work object after work is finished being assigned
    {
        boost::asio::io_service::work work(io_service);
        for (int i = 0; i < maxNumThreads; ++i)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run,
                &io_service));
        }

        for(int i = 0; i < numSmallTasks; ++i)
        {
            io_service.post(boost::bind(&process_data, i, theTime));
        }
    }
    threads.join_all(); 
}

Here's what I had rather have (but don't know how to implement):

boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
    threads.create_thread(boost::bind(&boost::asio::io_service::run,
         &io_service));
}

for(int theTime = 0; theTime != totalTime; ++theTime)
{
    for(int i = 0; i < numSmallTasks; ++i)
    {
        io_service.post(boost::bind(&process_data, i, theTime));
    }
    // wait here until all of these tasks are finished before looping 
    // **** how do I do this? *****
}
// destroy work later and join all threads later...
like image 901
John Doe Avatar asked Oct 30 '12 19:10

John Doe


1 Answers

You may use futures for data processing and synchronize with them using boost::wait_for_all(). This will allow you to operate in terms of parts of work done, not threads.

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
   // Create task and corresponding future
   // Using shared ptr and binding operator() trick because
   // packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

   // Boost 1.51 syntax
   // For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
   typedef boost::packaged_task<int> task_t;

   boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
      boost::bind(&process_data, i, theTime));

   boost::unique_future<int> fut = task->get_future();

   pending_data.push_back(std::move(fut));
   io_service.post(boost::bind(&task_t::operator(), task));    
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end()); 
like image 192
Rost Avatar answered Nov 06 '22 03:11

Rost