Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost::asio::socket thread safety

Tags:

( This is a simplified version of my original question )

I have several threads that write to a boost asio socket. This seems to work very well, with no problems.

The documentation says a shared socket is not thread safe( here, way down at the bottom ) so I am wondering if I should protect the socket with mutex, or something.

This question insists that protection is necessary, but gives no advice on how to do so.

All the answers to my original question also insisted that what I was doing dangerous, and most urged me to replace my writes with async_writes or even more complicated things. However, I am reluctant to do this, since it would complicate code that is already working and none of the answerers convinced me they knew what they ware talking about - they seemed to have read the same documentation as I and were guessing, just as I was.

So, I wrote a simple program to stress test writing to a shared socket from two threads.

Here is the server, which simply writes out whatever it receives from the client

int main() {     boost::asio::io_service io_service;      tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 3001));      tcp::socket socket(io_service);     acceptor.accept(socket);      for (;;)     {         char mybuffer[1256];         int len = socket.read_some(boost::asio::buffer(mybuffer,1256));         mybuffer[len] = '\0';         std::cout << mybuffer;         std::cout.flush();      }    return 0; } 

Here is the client, which creates two threads that write to a shared socket as fast as they can

boost::asio::ip::tcp::socket * psocket;  void speaker1() {     string msg("speaker1: hello, server, how are you running?\n");     for( int k = 0; k < 1000; k++ ) {         boost::asio::write(             *psocket,boost::asio::buffer(msg,msg.length()));     }  } void speaker2() {     string msg("speaker2: hello, server, how are you running?\n");     for( int k = 0; k < 1000; k++ ) {         boost::asio::write(             *psocket,boost::asio::buffer(msg,msg.length()));     }  }  int main(int argc, char* argv[]) {      boost::asio::io_service io_service;    // connect to server      tcp::resolver resolver(io_service);     tcp::resolver::query query("localhost", "3001");     tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);     tcp::resolver::iterator end;     psocket = new tcp::socket(io_service);     boost::system::error_code error = boost::asio::error::host_not_found;     while (error && endpoint_iterator != end)     {         psocket->close();         psocket->connect(*endpoint_iterator++, error);     }       boost::thread t1( speaker1 );     boost::thread t2( speaker2 );      Sleep(50000);  } 

This works! Perfectly, as far as I can tell. The client does not crash. The messages arrive at the server without garbles. They usually arrive alternately, one from each thread. Sometimes one thread get two or three messages in before the other, but I do not think this is a problem so long as there are no garbles and all the messages arrive.

My conclusion: the socket may not be thread safe in some theoretical sense, but it is so hard to make it fail that I am not going to worry about it.

like image 661
ravenspoint Avatar asked Sep 09 '11 14:09

ravenspoint


People also ask

Is boost :: asio :: io_service thread safe?

It is safe to post handlers from within a handler for a single instance of an io_service according to the documentation.

Who is using Boost asio?

3 Answers. Show activity on this post. The systems software for managing an IBM Blue Gene/Q supercomputer uses Boost. Asio extensively.

What is boost asio Strand?

The boost::asio::bind_executor() function is a helper to bind a specific executor object, such as a strand, to a completion handler. This binding automatically associates an executor as shown above. For example, to bind a strand to a completion handler we would simply write: my_socket.

How does boost asio work?

At its core, Boost Asio provides a task execution framework that you can use to perform operations of any kind. You create your tasks as function objects and post them to a task queue maintained by Boost Asio. You enlist one or more threads to pick these tasks (function objects) and invoke them.


2 Answers

After restudying the code for async_write I am now convinced that any write operation is thread safe if and only if the packet size is smaller than

default_max_transfer_size = 65536; 

What happens is that as soon as an async_write is called an async_write_some is called in the same thread. Any threads in the pool calling some form of io_service::run will keep on calling async_write_some for that write operation until it completes.

These async_write_some calls can and will interleave if it has to be called more than once (the packets are larger than 65536).

ASIO does not queue writes to a socket as you would expect, one finishing after the other. In order to ensure both thread and interleave safe writes consider the following piece of code:

    void my_connection::async_serialized_write(             boost::shared_ptr<transmission> outpacket) {         m_tx_mutex.lock();         bool in_progress = !m_pending_transmissions.empty();         m_pending_transmissions.push(outpacket);         if (!in_progress) {             if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {                 boost::asio::async_write(m_socket,                     m_pending_transmissions.front()->scatter_buffers,                         boost::asio::transfer_all(),             boost::bind(&my_connection::handle_async_serialized_write,                         shared_from_this(),                         boost::asio::placeholders::error,                                        boost::asio::placeholders::bytes_transferred));             } else { // Send single buffer                 boost::asio::async_write(m_socket,                                     boost::asio::buffer(                                            m_pending_transmissions.front()->buffer_references.front(),                          m_pending_transmissions.front()->num_bytes_left),                 boost::asio::transfer_all(),                 boost::bind(                         &my_connection::handle_async_serialized_write,                         shared_from_this(),                         boost::asio::placeholders::error,                         boost::asio::placeholders::bytes_transferred));             }         }         m_tx_mutex.unlock();     }      void my_connection::handle_async_serialized_write(     const boost::system::error_code& e, size_t bytes_transferred) {         if (!e) {             boost::shared_ptr<transmission> transmission;             m_tx_mutex.lock();             transmission = m_pending_transmissions.front();             m_pending_transmissions.pop();             if (!m_pending_transmissions.empty()) {                 if (m_pending_transmissions.front()->scatter_buffers.size() > 0) {             boost::asio::async_write(m_socket,                     m_pending_transmissions.front()->scatter_buffers,                     boost::asio::transfer_exactly(                             m_pending_transmissions.front()->num_bytes_left),                     boost::bind(                             &chreosis_connection::handle_async_serialized_write,                             shared_from_this(),                             boost::asio::placeholders::error,                             boost::asio::placeholders::bytes_transferred));                 } else { // Send single buffer                     boost::asio::async_write(m_socket,                     boost::asio::buffer(                             m_pending_transmissions.front()->buffer_references.front(),                             m_pending_transmissions.front()->num_bytes_left),                     boost::asio::transfer_all(),                     boost::bind(                             &my_connection::handle_async_serialized_write,                             shared_from_this(),                             boost::asio::placeholders::error,                             boost::asio::placeholders::bytes_transferred));                 }             }             m_tx_mutex.unlock();             transmission->handler(e, bytes_transferred, transmission);         } else {             MYLOG_ERROR(             m_connection_oid.toString() << " " << "handle_async_serialized_write: " << e.message());             stop(connection_stop_reasons::stop_async_handler_error);         }     } 

This basically makes a queue for sending one packet at a time. async_write is called only after the first write succeeds which then calls the original handler for the first write.

It would have been easier if asio made write queues automatic per socket/stream.

like image 76
Climax Avatar answered Dec 08 '22 00:12

Climax


Use a boost::asio::io_service::strand for asynchronous handlers that are not thread safe.

A strand is defined as a strictly sequential invocation of event handlers (i.e. no concurrent invocation). Use of strands allows execution of code in a multithreaded program without the need for explicit locking (e.g. using mutexes).

The timer tutorial is probably the easiest way to wrap your head around strands.

like image 20
Sam Miller Avatar answered Dec 07 '22 23:12

Sam Miller