Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unexpected behaviour with Microsoft.EntityFrameworkCore.EntityFrameworkQueryableExtensions.ForEachAsync<T>()

Here are the steps to reproduce. The below program copies 10,000 rows from one SQL table to another using .Net Core console app and EF Core. The program inserts records in 100 batches, and (this is important!) it creates a new instance of DbContext for each insert.

1) Create SQL Server database, and the "Froms" and "Tos" tables:

create table Froms (
    Id int identity(1, 1) not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Froms] primary key clustered (Id asc)
)
go

create table Tos (
    Id int not null,
    Guid [uniqueidentifier] not null,

    constraint [PK_Tos] primary key clustered (Id asc)
)
go

2) Populate the "Froms" table:

set nocount on
declare @i int = 0

while @i < 10000
begin
    insert Froms (Guid)
    values (newid())

    set @i += 1
end
go

3) Create .Net Core console app project with the name TestForEachAsync. Change version of C# to 7.1 or later (required for async Main). Add Microsoft.EntityFrameworkCore.SqlServer nuget package.

4) Create classes:

Database entities

using System;

namespace TestForEachAsync
{
    public class From
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}

using System;

namespace TestForEachAsync
{
    public class To
    {
        public int Id { get; set; }
        public Guid Guid { get; set; }
    }
}

DbContext

using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    public class Context : DbContext
    {
        public DbSet<From> Froms { get; set; }
        public DbSet<To> Tos { get; set; }

        protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
        {
            optionsBuilder.UseSqlServer("YOUR_CONNECTION_STRING");
        }
    }
}

Main

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;

namespace TestForEachAsync
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            //Get the "froms"
            var selectContext = new Context();
            var froms = selectContext.Froms.Select(f => new { f.Id, f.Guid });

            int count = 0;
            Task<int> saveChangesTask = null;
            Context insertContext = new Context();
            Context prevInsertContext = null;

            //Iterate through "froms"
            await froms.ForEachAsync(
                async f =>
                {
                    //Add instace of "to" to the context
                    var to = new To { Id = f.Id, Guid = f.Guid };
                    await insertContext.Tos.AddAsync(to);
                    count++;

                    //If another 100 of "to"s has been added to the context...
                    if (count % 100 == 0)
                    {
                        //Wait for the previous 100 "to"s to finish saving to the database
                        if (saveChangesTask != null)
                        {
                            await saveChangesTask;
                        }

                        //Start saving the next 100 "to"s
                        saveChangesTask = insertContext.SaveChangesAsync();

                        //Dispose of the context that was used to save previous 100 "to"s
                        prevInsertContext?.Dispose();

                        //Reassign the context used to save the current 100 "to"s to a "prev" variable,
                        //and set context variable to the new Context instance.
                        prevInsertContext = insertContext;
                        insertContext = new Context();
                    }
                }
            );

            //Wait for second last 100 "to"s to finish saving to the database
            if (saveChangesTask != null)
            {
                await saveChangesTask;
            }

            //Save the last 100 "to"s to the database
            await insertContext.SaveChangesAsync();
            insertContext.Dispose();

            Console.WriteLine("Done");
            Console.ReadKey();
        }
    }
}

5) Run the app - you get an exception The connection does not support MultipleActiveResultSets. Looks like multiple operations are being started on insertContext, though I do not see why.

6) I found two ways to fix the issue:

  • Replace the await froms.ForEachAsync(...) loop with "normal" loop foreach (var f in froms) {...}, or
  • Inside the async loop, replace await saveChangesTask; with saveChangesTask.Wait();

But can someone explain please why the original code does not work as I expect?

Note: if you run the app multiple times, do not forget to truncate the "Tos" table before each run.

like image 953
Andrew Avatar asked Jun 19 '18 04:06

Andrew


1 Answers

You are falling into typical trap of passing async lambda to a method which expects delegate that returns void (Action<T> in this particular case), as described by Stephen Toub in Potential pitfalls to avoid when passing around async lambdas. It's really an equivalent of using async void with it's pitfalls, because your async code is simply not await-ed, thus breaking it's internal logic.

The solution is as usual a special overload which accepts Func<T, Task> instead of Action<T>. Probably it should have been provided by EF Core (you may consider posting a request for that), but for now you can implement it yourself with something like this:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Extensions.Internal;

namespace Microsoft.EntityFrameworkCore
{
    public static class AsyncExtensions
    {
        public static Task ForEachAsync<T>(this IQueryable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) =>
            source.AsAsyncEnumerable().ForEachAsync(action, cancellationToken);

        public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
        {
            using (var asyncEnumerator = source.GetEnumerator())
                while (await asyncEnumerator.MoveNext(cancellationToken))
                    await action(asyncEnumerator.Current);
        }
    }
}

which is basically the EF Core implementation with added await of the action.

Once you do that, your code will resolve to this method and everything should work as expected.

like image 148
Ivan Stoev Avatar answered Oct 13 '22 00:10

Ivan Stoev