Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

async Task<IEnumerable> with yield return?

Tags:

c#

async-await

The below method doesn't compile. Alternatives?

public static async Task<IEnumerable<object[]>> GetRecordsAsync(
    this Transaction transaction,
    string commandText,
    params SqlParameter[] parameters)
{
    // Get a SqlDataReader
    var reader = await transaction.GetReaderAsync(commandText, parameters);
    var fieldCount = -1;
    // Begin iterating through records asynchronously
    while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean
    {
        // Grab all the field values out
        if (fieldCount < 0)
            fieldCount = reader.FieldCount;
        var fields = new object[fieldCount];
        reader.GetValues(fields);
        // Yield return the field values from this record
        yield return fields;
    }
}

Error message:

The body of 'TransactionExtensions.GetRecordsAsync(Transaction, string, params SqlParameter[])' cannot be an iterator block because 'Task>' is not an iterator interface type

I don't see a way to adapt this answer to a similar sounding (but different) question, because I don't know a priori how many times the loop will go.

Edit: fixed formatting

like image 250
Matt Thomas Avatar asked Mar 19 '17 01:03

Matt Thomas


People also ask

Is yield return async?

Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.

What is yield return C#?

The yield return statement returns one element at a time. The return type of yield keyword is either IEnumerable or IEnumerator . The yield break statement is used to end the iteration. We can consume the iterator method that contains a yield return statement either by using foreach loop or LINQ query.

What is the difference between task and ValueTask?

As you know Task is a reference type and it is allocated on the heap but on the contrary, ValueTask is a value type and it is initialized on the stack so it would make a better performance in this scenario.

What is an IAsyncEnumerable?

IAsyncEnumerable<T> exposes an enumerator that has a MoveNextAsync() method that can be awaited. This means a method that produces this result can make asynchronous calls in between yielding results.


1 Answers

Based on @SLaks's comment to the question, here's a general alternative using Reactive Extensions:

/// <summary>
/// Turns the given asynchronous functions into an IObservable
/// </summary>
static IObservable<T> ToObservable<T>(
    Func<Task<bool>> shouldLoopAsync,
    Func<Task<T>> getAsync)
{
    return Observable.Create<T>(
        observer => Task.Run(async () =>
            {
                while (await shouldLoopAsync())
                {
                    var value = await getAsync();
                    observer.OnNext(value);
                }
                observer.OnCompleted();
            }
        )
    );
}

Example usage, tailored to solve the question's specific case:

/// <summary>
/// Asynchronously processes each record of the given reader using the given handler
/// </summary>
static async Task ProcessResultsAsync(this SqlDataReader reader, Action<object[]> fieldsHandler)
{
    // Set up async functions for the reader
    var shouldLoopAsync = (Func<Task<bool>>)reader.ReadAsync;
    var getAsync = new Func<SqlDataReader, Func<Task<object[]>>>(_reader =>
    {
        var fieldCount = -1;
        return () => Task.Run(() =>
        {
            Interlocked.CompareExchange(ref fieldCount, _reader.FieldCount, -1);
            var fields = new object[fieldCount];
            _reader.GetValues(fields);
            return fields;
        });
    })(reader);

    // Turn the async functions into an IObservable
    var observable = ToObservable(shouldLoopAsync, getAsync);

    // Process the fields as they become available
    var finished = new ManualResetEventSlim(); // This will be our signal for when the observable completes
    using (observable.Subscribe(
        onNext: fieldsHandler, // Invoke the handler for each set of fields
        onCompleted: finished.Set // Set the gate when the observable completes
    )) // Don't forget best practice of disposing IDisposables
        // Asynchronously wait for the gate to be set
        await Task.Run((Action)finished.Wait);
}

(Note that getAsync could be simplified in the above code block, but I like how explicit it is about the closure that's being created)

...and finally:

// Get a SqlDataReader
var reader = await transaction.GetReaderAsync(commandText, parameters);
// Do something with the records
await reader.ProcessResultsAsync(fields => { /* Code here to process each record */ });
like image 77
Matt Thomas Avatar answered Oct 12 '22 23:10

Matt Thomas