Here are the steps to reproduce. The below program copies 10,000 rows from one SQL table to another using .Net Core console app and EF Core. The program inserts records in 100 batches, and (this is important!) it creates a new instance of DbContext for each insert.
1) Create SQL Server database, and the "Froms" and "Tos" tables:
create table Froms (
Id int identity(1, 1) not null,
Guid [uniqueidentifier] not null,
constraint [PK_Froms] primary key clustered (Id asc)
)
go
create table Tos (
Id int not null,
Guid [uniqueidentifier] not null,
constraint [PK_Tos] primary key clustered (Id asc)
)
go
2) Populate the "Froms" table:
set nocount on
declare @i int = 0
while @i < 10000
begin
insert Froms (Guid)
values (newid())
set @i += 1
end
go
3) Create .Net Core console app project with the name TestForEachAsync
. Change version of C# to 7.1 or later (required for async Main
). Add Microsoft.EntityFrameworkCore.SqlServer
nuget package.
4) Create classes:
Database entities
using System;
namespace TestForEachAsync
{
public class From
{
public int Id { get; set; }
public Guid Guid { get; set; }
}
}
using System;
namespace TestForEachAsync
{
public class To
{
public int Id { get; set; }
public Guid Guid { get; set; }
}
}
DbContext
using Microsoft.EntityFrameworkCore;
namespace TestForEachAsync
{
public class Context : DbContext
{
public DbSet<From> Froms { get; set; }
public DbSet<To> Tos { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("YOUR_CONNECTION_STRING");
}
}
}
Main
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
namespace TestForEachAsync
{
internal class Program
{
private static async Task Main(string[] args)
{
//Get the "froms"
var selectContext = new Context();
var froms = selectContext.Froms.Select(f => new { f.Id, f.Guid });
int count = 0;
Task<int> saveChangesTask = null;
Context insertContext = new Context();
Context prevInsertContext = null;
//Iterate through "froms"
await froms.ForEachAsync(
async f =>
{
//Add instace of "to" to the context
var to = new To { Id = f.Id, Guid = f.Guid };
await insertContext.Tos.AddAsync(to);
count++;
//If another 100 of "to"s has been added to the context...
if (count % 100 == 0)
{
//Wait for the previous 100 "to"s to finish saving to the database
if (saveChangesTask != null)
{
await saveChangesTask;
}
//Start saving the next 100 "to"s
saveChangesTask = insertContext.SaveChangesAsync();
//Dispose of the context that was used to save previous 100 "to"s
prevInsertContext?.Dispose();
//Reassign the context used to save the current 100 "to"s to a "prev" variable,
//and set context variable to the new Context instance.
prevInsertContext = insertContext;
insertContext = new Context();
}
}
);
//Wait for second last 100 "to"s to finish saving to the database
if (saveChangesTask != null)
{
await saveChangesTask;
}
//Save the last 100 "to"s to the database
await insertContext.SaveChangesAsync();
insertContext.Dispose();
Console.WriteLine("Done");
Console.ReadKey();
}
}
}
5) Run the app - you get an exception The connection does not support MultipleActiveResultSets
. Looks like multiple operations are being started on insertContext
, though I do not see why.
6) I found two ways to fix the issue:
await froms.ForEachAsync(...)
loop with "normal" loop foreach (var f in froms) {...}
, orawait saveChangesTask;
with saveChangesTask.Wait();
But can someone explain please why the original code does not work as I expect?
Note: if you run the app multiple times, do not forget to truncate the "Tos" table before each run.
You are falling into typical trap of passing async lambda to a method which expects delegate that returns void (Action<T>
in this particular case), as described by Stephen Toub in Potential pitfalls to avoid when passing around async lambdas. It's really an equivalent of using async void
with it's pitfalls, because your async code is simply not await
-ed, thus breaking it's internal logic.
The solution is as usual a special overload which accepts Func<T, Task>
instead of Action<T>
. Probably it should have been provided by EF Core (you may consider posting a request for that), but for now you can implement it yourself with something like this:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore.Extensions.Internal;
namespace Microsoft.EntityFrameworkCore
{
public static class AsyncExtensions
{
public static Task ForEachAsync<T>(this IQueryable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default) =>
source.AsAsyncEnumerable().ForEachAsync(action, cancellationToken);
public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, Task> action, CancellationToken cancellationToken = default)
{
using (var asyncEnumerator = source.GetEnumerator())
while (await asyncEnumerator.MoveNext(cancellationToken))
await action(asyncEnumerator.Current);
}
}
}
which is basically the EF Core implementation with added await
of the action
.
Once you do that, your code will resolve to this method and everything should work as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With