Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is a good way to handle multithreading with Poco SocketReactor?

So I'm starting to do some research on alternatives for implementing a high volume client/server system, and I'm currently looking at Poco's Reactor framework since I'm using Poco for so much of my application frameworks now.

The incoming packet sizes are going to be pretty small, so I think it will work fine from the perspective of reading the data from the clients. But the operations that will be performed based on the client input will be relatively expensive and may need to be offloaded to another process or even another server. And the responses sent back to the client will sometimes be fairly large. So obviously I can't block the reactor thread while that is taking place.

So I'm thinking if I just read the data in the reactor event handler and then pass it to another thread(pool) that processes the data, it would work out better.

What I'm not too sure about is the process for sending the responses back to the client when the operations are complete.

I can't find too much information about the best ways to use the framework. But I've done some testing and it looks like the reactor will fire the WritableNotification event repeatedly while the socket is writable. So would the optimal process be to queue up the data that needs to be sent in the object that receives the WritableNotification events and send small chunks each time the event is received?

Update: So when I started testing this I was horrified to discover that server CPU usage went up to 100% on the CPU the server app was running on with a single connection. But after some digging I found what I was doing wrong. I discovered that I don't need to register for WritableNotification events when the service handler is created, I only need to register when I have data to send. Then once all of the data is sent, I should unregister the event handler. This way the reactor doesn't have to keep calling the event handlers over and over when there is nothing to send. Now my CPU usage stays close to 0 even with 100 connections. Whew!

like image 635
Gerald Avatar asked Oct 10 '22 08:10

Gerald


1 Answers

i have wrote a class ServerConnector that copied from SocketConnector, but do not call the connect for socket, because the socket was connected already, if a reactor was started with a ServiceHandler for notifications in the run() function of TcpServerConnection, the class TcpServer would start a new thread. so, i got multithread of reactor-partten, but i do not konw it's best way or not.

class ServerConnector

template <class ServiceHandler>
class ServerConnector
{
public:     
    explicit ServerConnector(StreamSocket& ss):
        _pReactor(0),
        _socket(ss)
        /// Creates a ServerConnector, using the given Socket.
    {
    }

    ServerConnector(StreamSocket& ss, SocketReactor& reactor):
        _pReactor(0),
        _socket(ss)
        /// Creates an acceptor, using the given ServerSocket.
        /// The ServerConnector registers itself with the given SocketReactor.
    {
        registerConnector(reactor);
        onConnect();
    }

    virtual ~ServerConnector()
        /// Destroys the ServerConnector.
    {
        unregisterConnector();
    }

//
// this part is same with SocketConnector
//

private:
    ServerConnector();
    ServerConnector(const ServerConnector&);
    ServerConnector& operator = (const ServerConnector&);

    StreamSocket&   _socket;
    SocketReactor* _pReactor;
};

the Echo-Service is a common ServiceHander

class EchoServiceHandler
{
public:
    EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
        _socket(socket),
        _reactor(reactor)
    {
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
    }

    ~EchoServiceHandler()
    {
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
    }

    void onReadable(ReadableNotification* pNf)
    {
        pNf->release();
        char buffer[4096];
        try {
            int n = _socket.receiveBytes(buffer, sizeof(buffer));
            if (n > 0)
            {
                _socket.sendBytes(buffer, n);
            } else
                onError();
        } catch( ... ) {
            onError();
        }
    }

    void onError(ErrorNotification* pNf)
    {
        pNf->release();
        onError();
    }

    void onError()
    {
        _socket.shutdown();
        _socket.close();
        _reactor.stop();
        delete this;
    }

private:
    StreamSocket   _socket;
    SocketReactor& _reactor;
};

The EchoReactorConnection works with class TcpServer to run reactor as a thread

class EchoReactorConnection: public TCPServerConnection
{
public:
    EchoReactorConnection(const StreamSocket& s): TCPServerConnection(s)
    {
    }

    void run()
    {
        StreamSocket& ss = socket();
        SocketReactor reactor;

        ServerConnector<EchoServiceHandler> sc(ss, reactor);
        reactor.run();
        std::cout << "exit EchoReactorConnection thread" << std::endl;
    }
};

cppunit test case is same with TCPServerTest::testMultiConnections, but using EchoReactorConnection for multi-thread.

void TCPServerTest::testMultithreadReactor()
{
    ServerSocket svs(0);
    TCPServerParams* pParams = new TCPServerParams;
    pParams->setMaxThreads(4);
    pParams->setMaxQueued(4);
    pParams->setThreadIdleTime(100);

    TCPServer srv(new TCPServerConnectionFactoryImpl<EchoReactorConnection>(), svs, pParams);
    srv.start();

    assert (srv.currentConnections() == 0);
    assert (srv.currentThreads() == 0);
    assert (srv.queuedConnections() == 0);
    assert (srv.totalConnections() == 0);

    //
    // same with TCPServerTest::testMultiConnections()
    //
    // ....
    ///
}
like image 69
Sean Avatar answered Oct 28 '22 05:10

Sean