Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)?

I am using Dapper to stream data from a very large set in SQL Server. It works fine with returning IEnumerable and calling Query(), but when I switch to QueryAsync(), it seems that the program tries to read all of the data from SQL Server instead of streaming.

According to this question, it should work fine with buffered: false, which I am doing, but the question says nothing about async/await.

Now according to this question, it's not straightforward to do what I want with QueryAsync().

Do I understand correctly that enumerables are iterated when the context is switched for async/await?

Another question if this is something that will be possible to do when the new C#8 async streaming is available?

like image 254
Ilya Chernomordik Avatar asked Apr 05 '19 13:04

Ilya Chernomordik


2 Answers

In the context of dapper specifically, yes: it needs a different API as explained by the excellent answer by @Panagiotis. What follows isn't an answer as such, but is additional context that implementors facing the same challenges may wish to consider.

I haven't "spiked" this for dapper yet (although I have for SE.Redis), and I'm torn between various options:

  1. add a new API for .NET Core only, returning an appropriate async-enumerable type
  2. completely smash the existing API as a breaking change (a "major" etc), changing it to return an async-enumerable type

We'll probably go with "1", but I have to say, the second option is unusually tempting, for good reasons:

  • the existing API probably doesn't do what people expect it to do
  • we'd want new code to start using it

But the odd thing is the .NET Core 3.0-ness of IAsyncEnumerable<T> - as obviously Dapper doesn't just target .NET Core 3.0; we could:

  1. limit the feature to .NET Core 3.0, and return IAsyncEnumerable<T>
  2. limit the library to .NET Core 3.0, and return IAsyncEnumerable<T>
  3. take a dependency on System.Linq.Async (which isn't "official", but is official-enough for our purposes) for the previous frameworks, and return IAsyncEnumerable<T>
  4. return a custom enumerable type that isn't actually IAsyncEnumerable<T> (but which implements IAsyncEnumerable<T> when available), and manually implement the state machine - the duck-typed nature of foreach means this will work fine as long as our custom enumerable type provides the right methods

I think we'll probably go with option 3, but to reiterate: yes, something needs to change.

like image 186
Marc Gravell Avatar answered Sep 17 '22 14:09

Marc Gravell


Update March 2020

.NET Core 3.0 (and 3.1) have come out now, with full support for async streams. The Microsoft.Bcl.AsyncInterfaces adds support for them to .NET Standard 2.0 and .NET Framework 4.6.1+, although 4.7.2 should be used for sanity reasons. As the docs on .NET Standard implementation support explain

While NuGet considers .NET Framework 4.6.1 as supporting .NET Standard 1.5 through 2.0, there are several issues with consuming .NET Standard libraries that were built for those versions from .NET Framework 4.6.1 projects.

For .NET Framework projects that need to use such libraries, we recommend that you upgrade the project to target .NET Framework 4.7.2 or higher.

Original Answer

If you check the source code, you'll see that your suspicion is almost correct. When buffered is false, QueryAsync will stream synchronously.

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

As the comment explains, it's not possible to use ReadAsync when the return type is expected to be IEnumerable. That's why C# 8's async enumerables had to be introduced.

The code for ExecuteReaderSync is :

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

It uses Read instead of ReadAsync.

C#8 async streams will allow rewriting this to return an IAsyncEnumerable. Simply changing the language version won't solve the problem.

Given the current docs on async streams this could look like :

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut async streams is one of the things that can only work on .NET Core, and probably isn't implemented yet. When I tried to write one in Sharplab.io, Kaboom. [connection lost, reconnecting…]

like image 21
Panagiotis Kanavos Avatar answered Sep 17 '22 14:09

Panagiotis Kanavos