Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to link two C# APIs that expect you to provide a stream?

I am working with two C# stream APIs, one of which is a data source and the other of which is a data sink.

Neither API actually exposes a stream object; both expect you to pass a stream into them and they handle writing/reading from the stream.

Is there a way to link these APIs together such that the output of the source is streamed into the sink without having to buffer the entire source in a MemoryStream? This is a very RAM-sensitive application.

Here's an example that uses the MemoryStream approach that I'm trying to avoid, since it buffers the entire stream in RAM before writing it out to S3:

using (var buffer = new MemoryStream())
using (var transferUtil = new TransferUtility(s3client))
{
    // This destructor finishes the file and transferUtil closes 
    // the stream, so we need this weird using nesting to keep everyone happy.
    using (var parquetWriter = new ParquetWriter(schema, buffer)) 
        using (var rowGroupWriter = parquetWriter.CreateRowGroup())
        {
            rowGroupWriter.WriteColumn(...);
            ...
        }
    transferUtil.Upload(buffer, _bucketName, _key.Replace(".gz", "") + ".parquet");
}
like image 392
Techrocket9 Avatar asked Oct 10 '18 15:10

Techrocket9


1 Answers

You are looking for a stream that can be passed to both the data source and sink and that can 'transfer' the data between the two asynchronously. There are a number of possible solutions and I might have considered a producer-consumer pattern around a BlockingCollection.

Recently, the addition of the System.IO.Pipelines, Span and Memory types have really focused on high performance IO and I think it would be a good fit here. The Pipe class with it's associated Reader and Writer, can automatically handle the flow control, back pressure and IO between themselves whilst utilising all the new Span and Memory related types.

I have uploaded a Gist at PipeStream that will give you a custom stream with an internal Pipe implementation that you can pass to both your API classes. Whatever is written to the WriteAsync (or Write) method will be made available to the ReadAsync (or Read) method without requiring any further byte[] or MemoryStream allocations

In your case you would simply substite the MemoryStream for this new class and it should work out of the box. I haven't got a full S3 test working but reading directly from the Parquet stream and dumping it to the console window shows that it works asynchronously.

// Create some very badly 'mocked' data
var idColumn = new DataColumn(
    new DataField<int>("id"),
    Enumerable.Range(0, 10000).Select(i => i).ToArray());

var cityColumn = new DataColumn(
    new DataField<string>("city"),
    Enumerable.Range(0, 10000).Select(i => i % 2 == 0 ? "London" : "Grimsby").ToArray());

var schema = new Schema(idColumn.Field, cityColumn.Field);

using (var pipeStream = new PipeStream())
{
    var buffer = new byte[4096];
    int read = 0;

    var readTask = Task.Run(async () =>
    {
        //transferUtil.Upload(readStream, "bucketName", "key"); // Execute this in a Task / Thread 
        while ((read = await pipeStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
        {
            var incoming = Encoding.ASCII.GetString(buffer, 0, read);
            Console.WriteLine(incoming);
            // await Task.Delay(5000); uncomment this to simulate very slow consumer
        }
    });

    using (var parquetWriter = new ParquetWriter(schema, pipeStream)) // This destructor finishes the file and transferUtil closes the stream, so we need this weird using nesting to keep everyone happy.
    using (var rowGroupWriter = parquetWriter.CreateRowGroup())
    {
        rowGroupWriter.WriteColumn(idColumn);  // Step through both these statements to see data read before the parquetWriter completes
        rowGroupWriter.WriteColumn(cityColumn);
    }       
}

The implementation is not completely finished but I think it shows a nice approach. In the console 'readTask' you can un-comment the Task.Delay to simulate a slow read (transferUtil) and you should see the pipe automatically throttles the write task.

You need to be using C# 7.2 or later (VS 2017 -> Project Properties -> Build -> Advanced -> Language Version) for one of the Span extension methods but it should be compatible with any .Net Framework. You may need the Nuget Package

The stream is readable and writable (obviously!) but not seekable which should work for you in this scenario but wouldn't work reading from the Parquet SDK which requires seekable streams.

Hope it helps

like image 101
SteveHayles Avatar answered Oct 16 '22 12:10

SteveHayles