We have recently migrated our ASP.NET Core API which uses Dapper
to .NET Core 3.1. After the migration, we felt there was an opportunity to use the latest IAsyncEnumerable
feature from C# 8
for one of our endpoints.
Here is the pseudocode before the changes:
public async Task<IEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
foreach (var item in items)
{
yield return item;
}
}
}
After IAsyncEnumerable
code changes:
// Import Nuget pacakage: System.Linq.Async
public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
await foreach (var item in items.ToAsyncEnumerable())
{
yield return item;
}
}
}
The above approach is to use ToAsyncEnumerable
is loosely inspired from this post, but I'm not 100% sure if I'm using it in the right place/ context.
Question:
IEnumerable
but can we use ToAsyncEnumerable
to convert it into IAsyncEnumerable
for async
stream
like above?Note: This question looks similar to What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? but I do not think that answers my question.
Update: I wasn't aware of async iterators when I first wrote this answer. Thanks to Theodor Zoulias for pointing it out. In light of that, a much simpler approach is possible:
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();
// Consider using reader.NextResultAsync(). Follow github issue for details:
while (await reader.ReadAsync()) {
yield return rowParser(reader);
}
Ref: https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322
Original Answer:
Here's an IAsyncEnumerable
wrapper I wrote that may help those who want to stream unbuffered data using async/await and also want the power of Dapper's type mapping:
public class ReaderParser<T> : IAsyncEnumerable<T> {
public ReaderParser(SqlDataReader reader) {
Reader = reader;
}
private SqlDataReader Reader { get; }
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
return new ReaderParserEnumerator<T>(Reader);
}
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
public ReaderParserEnumerator(SqlDataReader reader) {
Reader = reader;
RowParser = reader.GetRowParser<T>();
}
public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
private SqlDataReader Reader { get; }
private Func<IDataReader, T> RowParser { get; }
public async ValueTask DisposeAsync() {
await Reader.DisposeAsync();
}
public async ValueTask<bool> MoveNextAsync() {
return await Reader.ReadAsync();
}
}
Usage:
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);
And then, package System.Linq.Async
adds basically all the nice IEnumerable
extensions you know and love, e.g. in my usage:
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With