Lets say that I request a large json file that contains a list of many objects. I don't want them to be in memory all at once, but I would rather read and process them one by one. So I need to turn an async System.IO.Stream
stream into an IAsyncEnumerable<T>
. How do I use the new System.Text.Json
API to do this?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
TL;DR It's not trivial
Looks like someone already posted full code for a Utf8JsonStreamReader
struct that reads buffers from a stream and feeds them to a Utf8JsonRreader, allowing easy deserialization with JsonSerializer.Deserialize<T>(ref newJsonReader, options);
. The code isn't trivial either. The related question is here and the answer is here.
That's not enough though - HttpClient.GetAsync
will return only after the entire response is received, essentially buffering everything in memory.
To avoid this, HttpClient.GetAsync(string,HttpCompletionOption ) should be used with HttpCompletionOption.ResponseHeadersRead
.
The deserialization loop should check the cancellation token too, and either exit or throw if it's signalled. Otherwise the loop will go on until the entire stream is received and processed.
This code is based in the related answer's example and uses HttpCompletionOption.ResponseHeadersRead
and checks the cancellation token. It can parse JSON strings that contain a proper array of items, eg :
[{"prop1":123},{"prop1":234}]
The first call to jsonStreamReader.Read()
moves to the start of the array while the second moves to the start of the first object. The loop itself terminates when the end of the array (]
) is detected.
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
JSON fragments, AKA streaming JSON aka ...*
It's quite common in event streaming or logging scenarios to append individual JSON objects to a file, one element per line eg :
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
This isn't a valid JSON document but the individual fragments are valid. This has several advantages for big data/highly concurrent scenarios. Adding a new event only requires appending a new line to the file, not parsing and rebuilding the entire file. Processing, especially parallel processing is easier for two reasons:
Using a StreamReader
The allocate-y way to do this would be to use a TextReader, read one line at a time and parse it with JsonSerializer.Deserialize :
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
That's a lot simpler than the code that deserializes a proper array. There are two issues :
ReadLineAsync
doesn't accept a cancellation tokenThis may be enough though as trying to produce the ReadOnlySpan<Byte>
buffers needed by JsonSerializer.Deserialize isn't trivial.
Pipelines and SequenceReader
To avoid alllocations, we need to get a ReadOnlySpan<byte>
from the stream. Doing this requires using System.IO.Pipeline pipes and the SequenceReader struct. Steve Gordon's An Introduction to SequenceReader explains how this class can be used to read data from a stream using delimiters.
Unfortunately, SequenceReader
is a ref struct which means it can't be used in async or local methods. That's why Steve Gordon in his article creates a
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
method to read items form a ReadOnlySequence and return the ending position, so the PipeReader can resume from it. Unfortunately we want to return an IEnumerable or IAsyncEnumerable, and iterator methods don't like in
or out
parameters either.
We could collect the deserialized items in a List or Queue and return them as a single result, but that would still allocate lists, buffers or nodes and have to wait for all items in a buffer to be deserialized before returning :
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
We need something that acts like an enumerable without requiring an iterator method, works with async and doesn't buffer everything the way.
Adding Channels to produce an IAsyncEnumerable
ChannelReader.ReadAllAsync returns an IAsyncEnumerable. We can return a ChannelReader from methods that couldn't work as iterators and still produce a stream of elements without caching.
Adapting Steve Gordon's code to use channels, we get the ReadItems(ChannelWriter...) and ReadLastItem
methods. The first one, reads one item at a time, up to a newline using ReadOnlySpan<byte> itemBytes
. This can be used by JsonSerializer.Deserialize
. If ReadItems
can't find the delimiter, it returns its position so the PipelineReader can pull the next chunk from the stream.
When we reach the last chunk and there's no other delimiter, ReadLastItem` reads the remaining bytes and deserializes them.
The code is almost identical to Steve Gordon's. Instead of writing to the Console, we write to the ChannelWriter.
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
The DeserializeToChannel<T>
method creates a Pipeline reader on top of the stream, creates a channel and starts a worker task that parses chunks and pushes them to the channel :
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
ChannelReader.ReceiveAllAsync()
can be used to consume all items through an IAsyncEnumerable<T>
:
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}
Yes, a truly streaming JSON (de)serializer would be a nice performance improvement to have, in so many places.
Unfortunately, System.Text.Json
does not do this at the time I'm writing this. I'm not sure if it will in the future - I hope so! Truly streaming deserialization of JSON turns out to be rather challenging.
You could check if the extremely fast Utf8Json supports it, perhaps.
However, there might be a custom solution for your specific situation, since your requirements seem to constrain the difficulty.
The idea is to manually read one item from the array at a time. We are making use of the fact that each item in the list is, in itself, a valid JSON object.
You can manually skip past the [
(for the first item) or the ,
(for each next item). Then I think your best bet is to use .NET Core's Utf8JsonReader
to determine where the current object ends, and feed the scanned bytes to JsonDeserializer
.
This way, you're only buffering slightly over one object at a time.
And since we're talking performance, you could get the input from a PipeReader
, while you're at it. :-)
I understand this is an old post, but the recently announced System.Text.Json support for IAsyncEnumerable
in .Net 6 Preview 4 provides solution the problem mentioned in OP.
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
{
yield return item;
}
}
}
}
This would provide an on-demand deserialization and is quite useful when working with large data. Please note that at the moment the feature is limited to root level JSON arrays.
More details on the feature could be found here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With