I am using Dapper to stream data from a very large set in SQL Server. It works fine with returning IEnumerable
and calling Query()
, but when I switch to QueryAsync()
, it seems that the program tries to read all of the data from SQL Server instead of streaming.
According to this question, it should work fine with buffered: false
, which I am doing, but the question says nothing about async/await
.
Now according to this question, it's not straightforward to do what I want with QueryAsync()
.
Do I understand correctly that enumerables are iterated when the context is switched for async/await
?
Another question if this is something that will be possible to do when the new C#8 async streaming is available?
In the context of dapper specifically, yes: it needs a different API as explained by the excellent answer by @Panagiotis. What follows isn't an answer as such, but is additional context that implementors facing the same challenges may wish to consider.
I haven't "spiked" this for dapper yet (although I have for SE.Redis), and I'm torn between various options:
We'll probably go with "1", but I have to say, the second option is unusually tempting, for good reasons:
But the odd thing is the .NET Core 3.0-ness of IAsyncEnumerable<T>
- as obviously Dapper doesn't just target .NET Core 3.0; we could:
IAsyncEnumerable<T>
IAsyncEnumerable<T>
IAsyncEnumerable<T>
IAsyncEnumerable<T>
(but which implements IAsyncEnumerable<T>
when available), and manually implement the state machine - the duck-typed nature of foreach
means this will work fine as long as our custom enumerable type provides the right methodsI think we'll probably go with option 3, but to reiterate: yes, something needs to change.
Update March 2020
.NET Core 3.0 (and 3.1) have come out now, with full support for async streams. The Microsoft.Bcl.AsyncInterfaces adds support for them to .NET Standard 2.0 and .NET Framework 4.6.1+, although 4.7.2 should be used for sanity reasons. As the docs on .NET Standard implementation support explain
While NuGet considers .NET Framework 4.6.1 as supporting .NET Standard 1.5 through 2.0, there are several issues with consuming .NET Standard libraries that were built for those versions from .NET Framework 4.6.1 projects.
For .NET Framework projects that need to use such libraries, we recommend that you upgrade the project to target .NET Framework 4.7.2 or higher.
Original Answer
If you check the source code, you'll see that your suspicion is almost correct. When buffered
is false, QueryAsync
will stream synchronously.
if (command.Buffered)
{
var buffer = new List<T>();
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
while (await reader.ReadAsync(cancel).ConfigureAwait(false))
{
object val = func(reader);
if (val == null || val is T)
{
buffer.Add((T)val);
}
else
{
buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
}
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
return buffer;
}
else
{
// can't use ReadAsync / cancellation; but this will have to do
wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
reader = null; // to prevent it being disposed before the caller gets to see it
return deferred;
}
As the comment explains, it's not possible to use ReadAsync
when the return type is expected to be IEnumerable. That's why C# 8's async enumerables had to be introduced.
The code for ExecuteReaderSync is :
private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (reader.Read())
{
yield return (T)func(reader);
}
while (reader.NextResult()) { /* ignore subsequent result sets */ }
(parameters as IParameterCallbacks)?.OnCompleted();
}
}
It uses Read
instead of ReadAsync
.
C#8 async streams will allow rewriting this to return an IAsyncEnumerable
. Simply changing the language version won't solve the problem.
Given the current docs on async streams this could look like :
private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (await reader.ReadAsync())
{
yield return (T)func(reader);
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
(parameters as IParameterCallbacks)?.OnCompleted();
}
}
Buuuuuut async streams is one of the things that can only work on .NET Core, and probably isn't implemented yet. When I tried to write one in Sharplab.io, Kaboom. [connection lost, reconnecting…]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With