Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How strands guarantee correct execution of pending events in boost.asio

Consider an echo server implemented using Boost.asio. Read events from connected clients result in blocks of data being placed on to an arrival event queue. A pool of threads works through these events - for each event, a thread takes the data in the event and echos it back to the connected client.

enter image description here

As shown in the diagram above, there could be multiple events in the event queue all from a single client. In order to ensure that these events for a given client are executed and delivered in order, strands are used. In this case, all events from a given connected client with be executed in a strand for the client.

My question is: how do strands guarantee the correct order of processing of events? I presume there must be some kind of lock-per-strand, but even that won't be sufficient, so there must be more to it, and I was hoping someone could perhaps explain it our point me to some code which does this?

I found this document: How strands work and why you should use them

It sheds some light on the mechanism, but says that in a strand "Handler execution order is not guaranteed". Does that mean that we could end up with receiving back "Strawberry forever. fields"?

Also - whenever a new client connects, do we have to create a new strand, so that there is one strand per client?

Finally - when a read event arrives, how do we know which strand to add it to? The strand has to be looked up form all strands using the connection as a key?

like image 797
SlappyTheFish Avatar asked Aug 23 '16 09:08

SlappyTheFish


1 Answers

strand provides a guarantee for non-concurrency and the invocation order of handlers; strand does not control the order in which operations are executed and demultiplexed. Use a strand if you have either:

  • multiple threads accessing a shared object that is not thread safe
  • a need for a guaranteed sequential ordering of handlers

The io_service will provide the desired and expected ordering of buffers being filled or used in the order in which operations are initiated. For instance, if the socket has "Strawberry fields forever." available to be read, then given:

buffer1.resize(11); // buffer is a std::vector managed elsewhere
buffer2.resize(7);  // buffer is a std::vector managed elsewhere
buffer3.resize(8);  // buffer is a std::vector managed elsewhere
socket.async_read_some(boost::asio::buffer(buffer1), handler1);
socket.async_read_some(boost::asio::buffer(buffer2), handler2);
socket.async_read_some(boost::asio::buffer(buffer3), handler3);

When the operations complete:

  • handler1 is invoked, buffer1 will contain "Strawberry "
  • handler2 is invoked, buffer2 will contain "fields "
  • handler3 is invoked, buffer3 will contain "forever."

However, the order in which the completion handlers are invoked is unspecified. This unspecified ordering remains true even with a strand.


Operation Demultiplexing

Asio uses the Proactor design pattern[1] to demultiplex operations. On most platforms, this is implemented in terms of a Reactor. The official documentation mentions the components and their responsibilities. Consider the following example:

socket.async_read_some(buffer, handler);

The caller is the initiator, starting an async_read_some asynchronous operation and creating the handler completion handler. The asynchronous operation is executed by the StreamSocketService operation processor:

  • Within the initiating function, if the socket has no other outstanding asynchronous read operations and data is available, then StreamSocketService will read from the socket and enqueue the handler completion handler into the io_service
  • Otherwise, the read operation is queued onto the socket, and the reactor is informed to notify Asio once data becomes available on the socket. When the io_service is ran and data is available on the socket, then the reactor will inform Asio. Next, Asio will dequeue an outstanding read operation from the socket, execute it, and enqueue the handler completion handler into the io_service

The io_service proactor will dequeue a completion handler, demultiplex the handler to threads that are running the io_service, from which the handler completion handler will be executed. The order of invocation of the completion handlers is unspecified.

Multiple Operations

If multiple operations of the same type are initiated on a socket, it is currently unspecified as to the order in which the buffers will be used or filled. However, in the current implementation, each socket uses a FIFO queue for each type of pending operation (e.g. a queue for read operations; a queue for write operations; etc). The networking-ts draft, which is based partially on Asio, specifies:

the buffers are filled in the order in which these operations were issued. The order of invocation of the completion handlers for these operations is unspecified.

Given:

socket.async_read_some(buffer1, handler1); // op1
socket.async_read_some(buffer2, handler2); // op2

As op1 was initiated before op2, then buffer1 is guaranteed to contain data that was received earlier in the stream than the data contained in buffer2, but handler2 may be invoked before handler1.

Composed Operations

Composed operations are composed of zero or more intermediate operations. For example, the async_read() composed asynchronous operation is composed of zero or more intermediate stream.async_read_some() operations.

The current implementation uses operation chaining to create a continuation, where a single async_read_some() operation is initiated, and within its internal completion handle, it determines whether or not to initiate another async_read_some() operation or to invoke the user's completion handler. Because of the continuation, the async_read documentation requires that no other reads occur until the composed operation completes:

The program must ensure that the stream performs no other read operations (such as async_read, the stream's async_read_some function, or any other composed operations that perform reads) until this operation completes.

If a program violates this requirement, one may observe interwoven data, because of the aforementioned order in which buffers are filled.

For a concrete example, consider the case where an async_read() operation is initiated to read 26 bytes of data from a socket:

buffer.resize(26); // buffer is a std::vector managed elsewhere
boost::asio::async_read(socket, boost::asio::buffer(buffer), handler);

If the socket receives "Strawberry ", "fields ", and then "forever.", then the async_read() operation may be composed of one or more socket.async_read_some() operations. For instance, it could be composed of 3 intermediate operations:

  • The first async_read_some() operation reads 11 bytes containing "Strawberry " into the buffer starting at an offset of 0. The completion condition of reading 26 bytes has not been satisfied, so another async_read_some() operation is initiated to continue the operation
  • The second async_read_some() operation reads 7 byes containing "fields " into the buffer starting at an offset of 11. The completion condition of reading 26 bytes has not been satisfied, so another async_read_some() operation is initiated to continue the operation
  • The third async_read_some() operation reads 8 byes containing "forever." into the buffer starting at an offset of 18. The completion condition of reading 26 bytes has been satisfied, so handler is enqueued into the io_service

When the handler completion handler is invoked, buffer contains "Strawberry fields forever."


Strand

strand is used to provide serialized execution of handlers in a guaranteed order. Given:

  • a strand object s
  • a function object f1 that is added to strand s via s.post(), or s.dispatch() when s.running_in_this_thread() == false
  • a function object f2 that is added to strand s via s.post(), or s.dispatch() when s.running_in_this_thread() == false

then the strand provides a guarantee of ordering and non-concurrency, such that f1 and f2 will not be invoked concurrently. Furthermore, if the addition of f1 happens before the addition of f2, then f1 will be invoked before f2.

With:

auto wrapped_handler1 = strand.wrap(handler1);
auto wrapped_handler2 = strand.wrap(handler2);
socket.async_read_some(buffer1, wrapped_handler1); // op1
socket.async_read_some(buffer2, wrapped_handler2); // op2

As op1 was initiated before op2, then buffer1 is guaranteed to contain data that was received earlier in the stream than the data contained in buffer2, but the order in which the wrapped_handler1 and wrapped_handler2 will be invoked is unspecified. The strand guarantees that:

  • handler1 and handler2 will not be invoked concurrently
  • if wrapped_handler1 is invoked before wrapped_handler2, then handler1 will be invoked before handler2
  • if wrapped_handler2 is invoked before wrapped_handler1, then handler2 will be invoked before handler1

Similar to the composed operation implementation, the strand implementation uses operation chaining to create a continuation. The strand manages all handlers posted to it in a FIFO queue. When the queue is empty and a handler is posted to the strand, then the strand will post an internal handle to the io_service. Within the internal handler, a handler will be dequeued from the strand's FIFO queue, executed, and then if the queue is not empty, the internal handler posts itself back to the io_service.

Consider reading this answer to find out how a composed operation uses asio_handler_invoke() to wrap intermediate handlers within the same context (i.e. strand) of the completion handler. The implementation details can be found in the comments on this question.


1. [POSA2] D. Schmidt et al, Pattern Oriented Software Architecture, Volume 2. Wiley, 2000.

like image 79
Tanner Sansbury Avatar answered Oct 20 '22 06:10

Tanner Sansbury