I have the following WebApi method that returns an unbounded result stream from RavenDB:
public IEnumerable<Foo> Get()
{
var query = DocumentSession.Query<Foo, FooIndex>();
using (var enumerator = DocumentSession.Advanced.Stream(query))
while (enumerator.MoveNext())
yield return enumerator.Current.Document;
}
Now I'd like to make that async. The naive approach of course doesn't work:
public async Task<IEnumerable<Location>> Get()
{
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
while (await enumerator.MoveNextAsync())
yield return enumerator.Current.Document;
}
...because the method can't be both async and an iterator.
Since this is a WebAPI action method, HTTP restricts you to a single response. If you just return an IEnumerable<T>
, then ASP.NET will enumerate it in-memory and then send the response.
If you're fine with this in-memory process, then you can just do the same thing yourself:
public async Task<List<Location>> Get()
{
var result = new List<Location>();
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
while (await enumerator.MoveNextAsync())
result.Add(enumerator.Current.Document);
return result;
}
However, I believe it would be better to use a streamed response, which you can get via PushStreamContent
; something like this:
public HttpResponseMessage Get()
{
var query = AsyncDocumentSession.Query<Foo, FooIndex>();
HttpResponseMessage response = Request.CreateResponse();
response.Content = new PushStreamContent(
async (stream, content, context) =>
{
using (stream)
using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
{
while (await enumerator.MoveNextAsync())
{
// TODO: adjust encoding as necessary.
var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
var data = UTF8Encoding.UTF8.GetBytes(serialized);
var countPrefix = BitConverter.GetBytes(data.Length);
await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
await stream.WriteAsync(data, 0, data.Length);
}
}
});
return response;
}
The streamed response doesn't require your server to hold the entire response in memory; however, you'll have to decide on the proper way to write documents to the response stream. The example code above just converts them to JSON, encodes in UTF8, and (binary) length-prefixes those strings.
You could implement your own iterator, instead of letting the compiler generate one for you.
But, calling MoveNext
on that iterator would have to be async as well - meaning you can't implement IEnumerable<T>
`IEnumerator, you'd have to define your own interface, e.g.,
IAsyncEnumerator`.
And you wouldn't be able to use that iterator in a foreach loop either.
The way I see it, your best bet is to do what StreamAsync
does. Create a custom type IAsyncEnumerable
that returns an IAsyncEnumerator<T>
that implements a custom async T MoveNextAsync()
method. The enumerable would wrap your query
object, and the enumerator would fetch a document session's document.
internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>
{
private readonly YourQueryType _query;
public AsyncDocumentEnumerable(YourQueryType query)
{
_query = query;
}
IAsyncEnumerator<Document> GetEnumerator()
{
return new AsyncDocumentEnumerator(_query);
}
}
internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>
{
private readonly YourQueryType _query;
private IAsyncEnumerator<DocumentSession> _iter;
public AsyncDocumentEnumerator(YourQueryType query)
{
_query = query;
}
public Task<bool> async MoveNextAsync()
{
if(_iter == null)
_iter = await AsyncDocumentSession.Advanced.StreamAsync(query);
bool moved = await _iter.MoveNextAsync();
if(moved)
Current = _iter.Current.Document;
return moved;
}
public Document Current{get; private set;}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With