Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Returning async stream of query results

I have the following WebApi method that returns an unbounded result stream from RavenDB:

public IEnumerable<Foo> Get()
{
    var query = DocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = DocumentSession.Advanced.Stream(query))
        while (enumerator.MoveNext())
            yield return enumerator.Current.Document;
}

Now I'd like to make that async. The naive approach of course doesn't work:

public async Task<IEnumerable<Location>> Get()
{
    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        while (await enumerator.MoveNextAsync())
            yield return enumerator.Current.Document;
}

...because the method can't be both async and an iterator.

like image 315
Diego Mijelshon Avatar asked May 30 '14 14:05

Diego Mijelshon


2 Answers

Since this is a WebAPI action method, HTTP restricts you to a single response. If you just return an IEnumerable<T>, then ASP.NET will enumerate it in-memory and then send the response.

If you're fine with this in-memory process, then you can just do the same thing yourself:

public async Task<List<Location>> Get()
{
  var result = new List<Location>();
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
    while (await enumerator.MoveNextAsync())
      result.Add(enumerator.Current.Document);
  return result;
}

However, I believe it would be better to use a streamed response, which you can get via PushStreamContent; something like this:

public HttpResponseMessage Get()
{
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  HttpResponseMessage response = Request.CreateResponse();
  response.Content = new PushStreamContent(
      async (stream, content, context) =>
      {
        using (stream)
        using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        {
          while (await enumerator.MoveNextAsync())
          {
            // TODO: adjust encoding as necessary.
            var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
            var data = UTF8Encoding.UTF8.GetBytes(serialized);
            var countPrefix = BitConverter.GetBytes(data.Length);
            await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
            await stream.WriteAsync(data, 0, data.Length);
          }
        }
      });
  return response;
}

The streamed response doesn't require your server to hold the entire response in memory; however, you'll have to decide on the proper way to write documents to the response stream. The example code above just converts them to JSON, encodes in UTF8, and (binary) length-prefixes those strings.

like image 158
Stephen Cleary Avatar answered Sep 27 '22 19:09

Stephen Cleary


You could implement your own iterator, instead of letting the compiler generate one for you.

But, calling MoveNext on that iterator would have to be async as well - meaning you can't implement IEnumerable<T>`IEnumerator, you'd have to define your own interface, e.g.,IAsyncEnumerator`. And you wouldn't be able to use that iterator in a foreach loop either.

The way I see it, your best bet is to do what StreamAsync does. Create a custom type IAsyncEnumerable that returns an IAsyncEnumerator<T> that implements a custom async T MoveNextAsync() method. The enumerable would wrap your query object, and the enumerator would fetch a document session's document.

internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>
{
    private readonly YourQueryType _query;
    public AsyncDocumentEnumerable(YourQueryType query)
    {
        _query = query;
    }

    IAsyncEnumerator<Document> GetEnumerator()
    {
        return new AsyncDocumentEnumerator(_query);
    }
}


internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>
{
    private readonly YourQueryType _query;
    private IAsyncEnumerator<DocumentSession> _iter;

    public AsyncDocumentEnumerator(YourQueryType query)
    {
        _query = query;
    }

    public Task<bool> async MoveNextAsync()
    {
        if(_iter == null)
            _iter = await AsyncDocumentSession.Advanced.StreamAsync(query);

        bool moved = await _iter.MoveNextAsync();

        if(moved)
            Current = _iter.Current.Document;
        return moved;
    }

    public Document Current{get; private set;}
}
like image 31
dcastro Avatar answered Sep 27 '22 21:09

dcastro