Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using IAsyncEnumerable with Dapper

We have recently migrated our ASP.NET Core API which uses Dapper to .NET Core 3.1. After the migration, we felt there was an opportunity to use the latest IAsyncEnumerable feature from C# 8 for one of our endpoints.

Here is the pseudocode before the changes:

public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }     
}

After IAsyncEnumerable code changes:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

The above approach is to use ToAsyncEnumerable is loosely inspired from this post, but I'm not 100% sure if I'm using it in the right place/ context.

Question:

  • The dapper library only returns IEnumerable but can we use ToAsyncEnumerable to convert it into IAsyncEnumerable for async stream like above?

Note: This question looks similar to What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? but I do not think that answers my question.

like image 722
Ankit Vijay Avatar asked Jan 28 '20 20:01

Ankit Vijay


1 Answers

Update: I wasn't aware of async iterators when I first wrote this answer. Thanks to Theodor Zoulias for pointing it out. In light of that, a much simpler approach is possible:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

// Consider using reader.NextResultAsync(). Follow github issue for details:

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

Ref: https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322

Original Answer:

Here's an IAsyncEnumerable wrapper I wrote that may help those who want to stream unbuffered data using async/await and also want the power of Dapper's type mapping:

public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

Usage:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

And then, package System.Linq.Async adds basically all the nice IEnumerable extensions you know and love, e.g. in my usage:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
like image 71
Dave Avatar answered Oct 16 '22 15:10

Dave