Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchonously deserializing a list using System.Text.Json

Lets say that I request a large json file that contains a list of many objects. I don't want them to be in memory all at once, but I would rather read and process them one by one. So I need to turn an async System.IO.Stream stream into an IAsyncEnumerable<T>. How do I use the new System.Text.Json API to do this?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
like image 441
Rick de Water Avatar asked Oct 26 '19 16:10

Rick de Water


3 Answers

TL;DR It's not trivial


Looks like someone already posted full code for a Utf8JsonStreamReader struct that reads buffers from a stream and feeds them to a Utf8JsonRreader, allowing easy deserialization with JsonSerializer.Deserialize<T>(ref newJsonReader, options);. The code isn't trivial either. The related question is here and the answer is here.

That's not enough though - HttpClient.GetAsync will return only after the entire response is received, essentially buffering everything in memory.

To avoid this, HttpClient.GetAsync(string,HttpCompletionOption ) should be used with HttpCompletionOption.ResponseHeadersRead.

The deserialization loop should check the cancellation token too, and either exit or throw if it's signalled. Otherwise the loop will go on until the entire stream is received and processed.

This code is based in the related answer's example and uses HttpCompletionOption.ResponseHeadersRead and checks the cancellation token. It can parse JSON strings that contain a proper array of items, eg :

[{"prop1":123},{"prop1":234}]

The first call to jsonStreamReader.Read() moves to the start of the array while the second moves to the start of the first object. The loop itself terminates when the end of the array (]) is detected.

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

JSON fragments, AKA streaming JSON aka ...*

It's quite common in event streaming or logging scenarios to append individual JSON objects to a file, one element per line eg :

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

This isn't a valid JSON document but the individual fragments are valid. This has several advantages for big data/highly concurrent scenarios. Adding a new event only requires appending a new line to the file, not parsing and rebuilding the entire file. Processing, especially parallel processing is easier for two reasons:

  • Individual elements can be retrieved one at a time, simply by reading one line from a stream.
  • The input file can be easily partitioned and split across line boundaries, feeding each part to a separate worker process, eg in a Hadoop cluster, or simply different threads in an application: Calculate the split points eg by dividing the length by the number of workers, then look for the first newline. Feed everything up to that point to a separate worker.

Using a StreamReader

The allocate-y way to do this would be to use a TextReader, read one line at a time and parse it with JsonSerializer.Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

That's a lot simpler than the code that deserializes a proper array. There are two issues :

  • ReadLineAsync doesn't accept a cancellation token
  • Each iteration allocates a new string, one of the things we wanted to avoid by using System.Text.Json

This may be enough though as trying to produce the ReadOnlySpan<Byte> buffers needed by JsonSerializer.Deserialize isn't trivial.

Pipelines and SequenceReader

To avoid alllocations, we need to get a ReadOnlySpan<byte> from the stream. Doing this requires using System.IO.Pipeline pipes and the SequenceReader struct. Steve Gordon's An Introduction to SequenceReader explains how this class can be used to read data from a stream using delimiters.

Unfortunately, SequenceReader is a ref struct which means it can't be used in async or local methods. That's why Steve Gordon in his article creates a

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

method to read items form a ReadOnlySequence and return the ending position, so the PipeReader can resume from it. Unfortunately we want to return an IEnumerable or IAsyncEnumerable, and iterator methods don't like in or out parameters either.

We could collect the deserialized items in a List or Queue and return them as a single result, but that would still allocate lists, buffers or nodes and have to wait for all items in a buffer to be deserialized before returning :

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

We need something that acts like an enumerable without requiring an iterator method, works with async and doesn't buffer everything the way.

Adding Channels to produce an IAsyncEnumerable

ChannelReader.ReadAllAsync returns an IAsyncEnumerable. We can return a ChannelReader from methods that couldn't work as iterators and still produce a stream of elements without caching.

Adapting Steve Gordon's code to use channels, we get the ReadItems(ChannelWriter...) and ReadLastItem methods. The first one, reads one item at a time, up to a newline using ReadOnlySpan<byte> itemBytes. This can be used by JsonSerializer.Deserialize. If ReadItems can't find the delimiter, it returns its position so the PipelineReader can pull the next chunk from the stream.

When we reach the last chunk and there's no other delimiter, ReadLastItem` reads the remaining bytes and deserializes them.

The code is almost identical to Steve Gordon's. Instead of writing to the Console, we write to the ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

The DeserializeToChannel<T> method creates a Pipeline reader on top of the stream, creates a channel and starts a worker task that parses chunks and pushes them to the channel :

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync() can be used to consume all items through an IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
like image 97
Panagiotis Kanavos Avatar answered Nov 03 '22 11:11

Panagiotis Kanavos


Yes, a truly streaming JSON (de)serializer would be a nice performance improvement to have, in so many places.

Unfortunately, System.Text.Json does not do this at the time I'm writing this. I'm not sure if it will in the future - I hope so! Truly streaming deserialization of JSON turns out to be rather challenging.

You could check if the extremely fast Utf8Json supports it, perhaps.

However, there might be a custom solution for your specific situation, since your requirements seem to constrain the difficulty.

The idea is to manually read one item from the array at a time. We are making use of the fact that each item in the list is, in itself, a valid JSON object.

You can manually skip past the [ (for the first item) or the , (for each next item). Then I think your best bet is to use .NET Core's Utf8JsonReader to determine where the current object ends, and feed the scanned bytes to JsonDeserializer.

This way, you're only buffering slightly over one object at a time.

And since we're talking performance, you could get the input from a PipeReader, while you're at it. :-)

like image 23
Timo Avatar answered Nov 03 '22 11:11

Timo


I understand this is an old post, but the recently announced System.Text.Json support for IAsyncEnumerable in .Net 6 Preview 4 provides solution the problem mentioned in OP.

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {

            await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
            {
                yield return item;
            }
        }
    }
}

This would provide an on-demand deserialization and is quite useful when working with large data. Please note that at the moment the feature is limited to root level JSON arrays.

More details on the feature could be found here

like image 4
Anu Viswan Avatar answered Nov 03 '22 10:11

Anu Viswan