Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost::asio async condition

The idea is to be able to replace multithreaded code with boost::asio and a thread pool, on a consumer/producer problem. Currently, each consumer thread waits on a boost::condition_variable - when a producer adds something to the queue, it calls notify_one/notify_all to notify all the consumers. Now what happens when you (potentially) have 1k+ consumers? Threads won't scale!

I decided to use boost::asio, but then I ran into the fact that it doesn't have condition variables. And then async_condition_variable was born:

class async_condition_variable
{
private:
    boost::asio::io_service& service_;
    typedef boost::function<void ()> async_handler;
    std::queue<async_handler> waiters_;

public:
    async_condition_variable(boost::asio::io_service& service) : service_(service)
    {
    }

    void async_wait(async_handler handler)
    {
        waiters_.push(handler);
    }

    void notify_one()
    {
        service_.post(waiters_.front());
        waiters_.pop();
    }

    void notify_all()
    {
        while (!waiters_.empty()) {
            notify_one();
        }
    }
};

Basically, each consumer would call async_condition_variable::wait(...). Then, a producer would eventually call async_condition_variable::notify_one() or async_condition_variable::notify_all(). Each consumer's handle would be called, and would either act on the condition or call async_condition_variable::wait(...) again. Is this feasible or am I being crazy here? What kind of locking (mutexes) should be performed, given the fact that this would be run on a thread pool?

P.S.: Yes, this is more a RFC (Request for Comments) than a question :).

like image 571
bruno nery Avatar asked Jul 09 '12 23:07

bruno nery


People also ask

What is boost :: asio :: Post?

boost::asio::post takes any callable object. Requirements for such object you can find here. There are many ways to achive what you want: [1] lambda expressions boost::asio::post(tp, [i]{ printProduct(i); });

What is boost :: asio :: io_service?

Asio defines boost::asio::io_service , a single class for an I/O service object. Every program based on Boost. Asio uses an object of type boost::asio::io_service . This can also be a global variable. While there is only one class for an I/O service object, several classes for I/O objects exist.

What is C++ boost asio?

Boost. Asio is a cross-platform C++ library for network and low-level I/O programming that provides developers with a consistent asynchronous model using a modern C++ approach. Overview. An overview of the features included in Boost. Asio, plus rationale and design information.

How does boost asio work?

At its core, Boost Asio provides a task execution framework that you can use to perform operations of any kind. You create your tasks as function objects and post them to a task queue maintained by Boost Asio. You enlist one or more threads to pick these tasks (function objects) and invoke them.


2 Answers

Have a list of things that need to be done when an event occurs. Have a function to add something to that list and a function to remove something from that list. Then, when the event occurs, have a pool of threads work on the list of jobs that now need to be done. You don't need threads specifically waiting for the event.

like image 158
David Schwartz Avatar answered Sep 30 '22 20:09

David Schwartz


Boost::asio can be kind of hard to wrap your head around. At least, I have difficult time doing it.

You don't need to have the threads wait on anything. They do that on their own when they don't have any work to do. The examples that seemed to look like what you wanted to do had work posted to the io_service for each item.

The following code was inspired from this link. It actually open my eyes to how you could use it do a lot of things.

I'm sure this isn't perfect, but I think it gives the general idea. I hope this helps.

Code

#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
class ServerProcessor
{
protected:
    void handleWork1(WorkObject1* work)
    {
        //The code to do task 1 goes in here
    }
    void handleWork2(WorkObject2* work)
    {
        //The code to do task 2 goes in here
    }

    boost::thread_group worker_threads_;

    boost::asio::io_service io_service_;
    //This is used to keep io_service from running out of work and exiting to soon.
    boost::shared_ptr<boost::asio::io_service::work> work_;


public:
    void start(int numberOfThreads)
    {
        boost::shared_ptr<boost::asio::io_service::work> myWork(new boost::asio::io_service::work(io_service_));
        work_=myWork;

        for (int x=0; x < numberOfThreads; ++x)
            worker_threads_.create_thread( boost::bind( &ServerProcessor::threadAction, this ) );

    }

    void doWork1(WorkObject1* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork1, this, work));
    }

    void doWork2(WorkObject2* work)
    {
        io_service_.post(boost::bind(&ServerProcessor::handleWork2, this, work));
    }


    void threadAction()
    {
        io_service_.run();
    }

    void stop()
    {
        work_.reset();
        io_service_.stop();
        worker_threads_.join_all();
    }

};

int main()
{
    ServerProcessor s;

    std::string input;
    std::cout<<"Press f to stop"<<std::endl;

    s.start(8);

    std::cin>>input;

    s.stop();

    return 0;
}
like image 21
Thomas Lann Avatar answered Sep 30 '22 20:09

Thomas Lann