Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make `await …` work with `yield return` (i.e. inside an iterator method)?

I have existing code that looks similar to:

IEnumerable<SomeClass> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(connectionString))
    using (SqlCommand cmd = new SqlCommand(sql, conn)
    {
        conn.Open();
        SqlDataReader reader = cmd.ExecuteReader();
        while (reader.Read())
        {
            SomeClass someClass = f(reader); // create instance based on returned row
            yield return someClass;
        }
    } 
}

It seems I could benefit by using reader.ReadAsync(). However, if I just modify the one line:

        while (await reader.ReadAsync())

the compiler informs me that await can only be used in methods marked with async, and suggests I modify the method signature to be:

async Task<IEnumerable<SomeClass>> GetStuff()

However, doing that makes GetStuff() unusable because:

The body of GetStuff() cannot be an iterator block because Task<IEnumerable<SomeClass>> is not an iterator interface type.

I'm sure I am missing a key concept with the async programming model.

Questions:

  • Can I use ReadAsync() in my iterator? How?
  • How can I think about the async paradigm differently so that I understand how it works in this type of situation?
like image 677
Eric J. Avatar asked Oct 30 '12 00:10

Eric J.


People also ask

Is yield return Async?

Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.

What is yield return C#?

The yield return statement returns one element at a time. The return type of yield keyword is either IEnumerable or IEnumerator . The yield break statement is used to end the iteration. We can consume the iterator method that contains a yield return statement either by using foreach loop or LINQ query.


2 Answers

The problem is what you're asking doesn't actually make much sense. IEnumerable<T> is a synchronous interface, and returning Task<IEnumerable<T>> isn't going to help you much, because some thread would have to block waiting for each item, no matter what.

What you actually want to return is some asynchronous alternative to IEnumerable<T>: something like IObservable<T>, dataflow block from TPL Dataflow or IAsyncEnumerable<T>, which is planned to be added to C# 8.0/.Net Core 3.0. (And in the meantime, there are some libraries that contain it.)

Using TPL Dataflow, one way to do this would be:

ISourceBlock<SomeClass> GetStuff() {
    var block = new BufferBlock<SomeClass>();

    Task.Run(async () =>
    {
        using (SqlConnection conn = new SqlConnection(connectionString))
        using (SqlCommand cmd = new SqlCommand(sql, conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            while (await reader.ReadAsync())
            {
                SomeClass someClass;
                // Create an instance of SomeClass based on row returned.
                block.Post(someClass);
            }
            block.Complete();
        } 
    });

    return block;
}

You'll probably want to add error handling to the above code, but otherwise, it should work and it will be completely asynchronous.

The rest of your code would then consume items from the returned block also asynchronously, probably using ActionBlock.

like image 172
svick Avatar answered Oct 18 '22 09:10

svick


No, you can't currently use async with an iterator block. As svick says, you would need something like IAsyncEnumerable to do that.

If you have the return value Task<IEnumerable<SomeClass>> it means that the function returns a single Task object that, once completed, will provide you with a fully formed IEnumerable (no room for Task asynchrony in this enumerable). Once the task object is complete, the caller should be able to synchronously iterate through all the items it returned in the enumerable.

Here is a solution that returns Task<IEnumerable<SomeClass>>. You could get a large part of benefit of async by doing something like this:

async Task<IEnumerable<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            await conn.OpenAsync();
            SqlDataReader reader = await cmd.ExecuteReaderAsync();
            return ReadItems(reader).ToArray();
        }
    }
}

IEnumerable<SomeClass> ReadItems(SqlDataReader reader)
{
    while (reader.Read())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        yield return someClass;
    }
}

...and an example usage:

async void Caller()
{
    // Calls get-stuff, which returns immediately with a Task
    Task<IEnumerable<SomeClass>> itemsAsync = GetStuff();
    // Wait for the task to complete so we can get the items
    IEnumerable<SomeClass> items = await itemsAsync;
    // Iterate synchronously through the items which are all already present
    foreach (SomeClass item in items)
    {
        Console.WriteLine(item);
    }
}

Here you have the iterator part and the async part in separate functions which allows you to use both the async and yield syntax. The GetStuff function asynchronously acquires the data, and the ReadItems then synchronously reads the data into an enumerable.

Note the ToArray() call. Something like this is necessary because the enumerator function executes lazily and so your async function may otherwise dispose the connection and command before all the data is read. This is because the using blocks cover the duration of the Task execution, but you would be iterating it after the task is complete.

This solution does not use ReadAsync, but it does use OpenAsync and ExecuteReaderAsync, which probably gives you most of the benefit. In my experience it is the ExecuteReader that will take the most time and have the most benefit being async. By the time I've read the first row, the SqlDataReader has all the other rows already and ReadAsync just returns synchronously. If this is the case for you as well then you won't get significant benefit by moving to a push-based system like IObservable<T> (which will require significant modifications to the calling function).

For illustration, consider an alternative approach to the same issue:

IEnumerable<Task<SomeClass>> GetStuff()
{
    using (SqlConnection conn = new SqlConnection(""))
    {
        using (SqlCommand cmd = new SqlCommand("", conn))
        {
            conn.Open();
            SqlDataReader reader = cmd.ExecuteReader();
            while (true)
                yield return ReadItem(reader);
        }
    }
}

async Task<SomeClass> ReadItem(SqlDataReader reader)
{
    if (await reader.ReadAsync())
    {
        // Create an instance of SomeClass based on row returned.
        SomeClass someClass = null;
        return someClass;
    }
    else
        return null; // Mark end of sequence
}

...and an example usage:

async void Caller()
{
    // Synchronously get a list of Tasks
    IEnumerable<Task<SomeClass>> items = GetStuff();
    // Iterate through the Tasks
    foreach (Task<SomeClass> itemAsync in items)
    {
        // Wait for the task to complete. We need to wait for 
        // it to complete before we can know if it's the end of
        // the sequence
        SomeClass item = await itemAsync;
        // End of sequence?
        if (item == null) 
            break;
        Console.WriteLine(item);
    }
}

In this case, GetStuff returns immediately with an enumerable, where each item in the enumerable is a task that will present a SomeClass object when it completes. This approach has a few flaws. Firstly, the enumerable returns synchronously so at the time it returns we actually don't know how many rows are in the result, which is why I made it an infinite sequence. This is perfectly legal but it has some side effects. I needed to use null to signal the end of useful data in the infinite sequence of tasks. Secondly, you have to be careful about how you iterate it. You need to iterate it forwards, and you need to wait for each row before iterating to the next row. You must also only dispose of the iterator after all the tasks have completed so that the GC doesn't collect connection before it's finished being used. For these reasons this is not a safe solution, and I must emphasize that I'm including it for illustration to help answer your second question.

like image 27
Mike Avatar answered Oct 18 '22 09:10

Mike