I have noticed the new System.IO.Pipelines and are trying to port existing, stream based, code over to it. The problems with streams are well understood, but at the same time it features a rich echosystem of related classes.
From the example provided here, there is a small tcp echo server. https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/
A snippet of the code is attached here:
private static async Task ProcessLinesAsync(Socket socket)
{
Console.WriteLine($"[{socket.RemoteEndPoint}]: connected");
var pipe = new Pipe();
Task writing = FillPipeAsync(socket, pipe.Writer);
Task reading = ReadPipeAsync(socket, pipe.Reader);
await Task.WhenAll(reading, writing);
Console.WriteLine($"[{socket.RemoteEndPoint}]: disconnected");
}
private static async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
try
{
// Request a minimum of 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read
writer.Advance(bytesRead);
}
catch
{
break;
}
// Make the data available to the PipeReader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Signal to the reader that we're done writing
writer.Complete();
}
private static async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position = null;
do
{
// Find the EOL
position = buffer.PositionOf((byte)'\n');
if (position != null)
{
var line = buffer.Slice(0, position.Value);
ProcessLine(socket, line);
// This is equivalent to position + 1
var next = buffer.GetPosition(1, position.Value);
// Skip what we've already processed including \n
buffer = buffer.Slice(next);
}
}
while (position != null);
// We sliced the buffer until no more data could be processed
// Tell the PipeReader how much we consumed and how much we left to process
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
reader.Complete();
}
private static void ProcessLine(Socket socket, in ReadOnlySequence<byte> buffer)
{
if (_echo)
{
Console.Write($"[{socket.RemoteEndPoint}]: ");
foreach (var segment in buffer)
{
Console.Write(Encoding.UTF8.GetString(segment.Span));
}
Console.WriteLine();
}
}
When using streams, you could easily add SSL/TLS to your code just by wrapping it in SslStream. How is this intended to be solved with Pipelines?
Named pipes are a network protocol, just as HTTP, FTP, and SMTP are. Lets look at the .net Framework for some quick examples:
But what if we are using a different network protocol, such as pipes? Right off the bat we know there is nothing similar to an "HTTPS" prefix. Furthermore, we can read the documentation for System.IO.Piplines and see that there is no "EnableSsl" method. However, in both .NET Framework and .NET Core, the SslStream class is available. This class allows you to build a SslStream out of almost any available Stream.
Also available in both .NET Framework and .NET Core is the System.IO.Pipes Namespace. The classes available in the Pipes namespace are pretty helpful.
All of these classes return some kind of object which inherits from Stream, and can thus be used in the constructor for a SslStream.
How does this relate back to the System.IO.Piplines Namespace? Well... it doesn't. None of the Classes, Structs, or Interfaces defined in the System.IO.Pipelines namespace inherit from Stream. So we can not use the SslStream class directly.
Instead, we have access to PipeReaders and PipeWriters. Sometimes we only have one of these available to us, but lets consider a bi-directional pipe so that we have access to both at the same time.
The System.IO.Piplines namespace helpfully provides an IDuplexPipe interface. If we want to wrap the PipeReader and PipeWriters in an SSL stream, we will need to define a new type that implements IDuplexPipe.
In this new type:
Here is an example in pseudocode:
SslStreamDuplexPipe : IDuplexPipe
{
SslStream sslStream;
Pipe inputBuffer;
Pipe outputBuffer;
public PipeReader Input = inputBuffer.Reader;
public PipeWriter Output = outputBuffer.Writer;
ReadDataFromSslStream()
{
int bytes = sslStream.Read(new byte[2048], 0, 2048);
inputBuffer.Writer.Advance(bytes)
inputBuffer.Writer.Flush();
}
//and the reverse to write to the SslStream
}
As you can see, we are still using the SslStream class from the System.Net.Security namespace, it just took us a few more steps.
Does this mean that you are basically still using streams? Yep! But, once you have fully implemented your SslStreamDuplexPipe class, you get to work with only pipes. No need to wrap an SslStream around everything.
Marc Gravell wrote a much, much, more detailed explanation of this. The first of the 3 parts can be found here: https://blog.marcgravell.com/2018/07/pipe-dreams-part-1.html
Additionally, you can read about the various .NET classes mentioned:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With