I have a project that has a Sql-Server database back end and Dapper as an ORM. I am attempting to use Dapper's QueryAsync()
method to get some data. Not only that, but the call to my repo is coming from inside several tasks that are called with a Task.WhenAll
(That is to say, each task involves getting data from that repo, so each task awaits my repo's method that wraps the QueryAsync()
call).
The problem is that my SqlConnections never close even though I am using a using
block. As a result, I have 100+ open connections to my database, and eventually start getting "max pool size reached" exceptions. The thing is, when I switch to just Query()
instead of QueryAsync()
, it works fine, but I'd like to be able to do this asynchronously.
Here's a code example. I tried to mimic the structure of the actual applicatoin as best as I could, which is why it looks more complex than it has to be.
Interface:
public interface IFooRepository<T> where T: FooBase
{
Task<IEnumerable<T>> Select(string account, DateTime? effectiveDate = null);
}
Implementation:
public class FooRepository : RepositoryBase, IFooRepository<SpecialFoo>
{
private readonly IWebApiClientRepository _accountRepository;
public FooRepository(IWebApiClientRepository repo)
{
_accountRepository = repo;
}
public async Task<IEnumerable<FuturePosition>> Select(string code, DateTime? effectiveDate = null)
{
effectiveDate = effectiveDate ?? DateTime.Today.Date;
var referenceData = await _accountRepository.GetCrossRefferenceData(code, effectiveDate.Value);
using (var connection = new SqlConnection("iamaconnectionstring")
{
connection.Open();
try
{
var res = await connection.QueryAsync<FuturePosition>(SqlQueryVariable + "AND t.code = @code;",
new
{
effectiveDate = effectiveDate.Value,
code = referenceData.Code
});
foreach (var item in res)
{
item.PropFromReference = referenceData.PropFromReference;
}
return res;
}
catch (Exception e)
{
//log
throw;
}
finally
{
connection.Close();
}
}
}
}
So now with the calling code, there are 2 layers. I'll start with the outer one. I think this is where the problem is. There are comments in the below.
Populator:
public class Populator : PopulatorBase
{
private IAccountRepository _acctRepository;
public override async Task<IEnumerable<PopulationResult>> ProcessAccount(DateTime? popDate = null)
{
//My attempt at throttling the async calls
//I was hoping this would force a max of 10 simultaneous connections.
//It did not work.
SemaphoreSlim ss = new SemaphoreSlim(10,10);
var accountsToProcess = _acctRepository.GetAllAccountsToProcess();
var accountNumbers = accountsToProcess.SelectMany(a => a.accountNumbers).ToList();
List<Task<ProcessResult>> trackedTasks = new List<Task<ProcessResult>>();
foreach (var item in accountNumbers)
{
await ss.WaitAsync();
trackedTasks.Add(ProcessAccount(item.AccountCode, popDate ?? DateTime.Today));
ss.Release();
}
//my gut tells me the issue is because of these tasks
var results = await Task.WhenAll(trackedTasks);
return results;
}
private async Task<ProcessResult>ProcessAccount(string accountCode, DateTime? popDate)
{
var createdItems = await _itemCreator.MakeExceptions(popDate, accountCode);
return Populate(accountCode, createdItems);
}
}
ItemCreator:
public class ItemCreator : ItemCreatorBase
{
private readonly IFooRepository<FuturePosition> _fooRepository;
private readonly IBarRepository<FuturePosition> _barRepository;
public RussellGlobeOpFutureExceptionCreator() )
{
//standard constructor stuff
}
public async Task<ItemCreationResult> MakeItems(DateTime? effectiveDate, string account)
{
DateTime reconDate = effectiveDate ?? DateTime.Today.Date;
//this uses the repository I outlined above
var foos = await _fooRepository.Select(account, effectiveDate);
//this repository uses a rest client, I doubt it's the problem
var bars = await _barRepository.Select(account, effectiveDate);
//just trying to make this example less lengthy
var foobars = MakeFoobars(foos, bars);
var result = new ItemCreationResult { EffectiveDate = effectiveDate, Items = foobars };
return result;
}
}
As far as what I've tried:
connection.OpenAnync()
in the repousing
)Its worthwhile to know that the foreach
loop in the populator runs around 500 times. Essentially, there's a list of 500 accounts. For each one, it needs to do a long running populate
task which involves pulling data from my Foo repo.
I honestly have no idea. I think it might have to do with awaiting my async db call from each task inside that list of tasks in the populator. Any insight into this issue would be super helpful.
After some digging, I think I managed to figure out the issue. I don't think I was actually experiencing a connection leak like I had originally assumed. From what I now understand, with connection pooling, when a SQL connection is closed from code, it doesn't actually disappear -- it just goes to the connection pool as an idle connection. Looking at open connections in SQL will still show it.
Since my data access was asynchronous, all of the connections opened before any "closed" connections were returned to the pool, which means a new connection was opened for every request. That caused the startling number of open connections that I saw, making me assume I had a connection leak.
Using a SemaphoreSlim actually took care of the issue -- I just implemented it incorrectly. It should work like this:
public override async Task<IEnumerable<ProcessResult>> ProcessAccount(DateTime? popDate = null)
{
foreach (item in accountNumbers)
{
trackedTasks.Add(new Func<Task<ProcessResult>>(async () =>
{
await ss.WaitAsync().ConfigureAwait(false);
try
{
return await ProcessAccount(item.AccountCode, popDate ?? DateTime.Today).ConfigureAwait(false);
}
catch (Exception e)
{
//log, etc.
}
finally
{
ss.Release();
}
})());
}
}
Doing this throttles the amount of connections being opened at a time, and waits for them to close, so the same smaller group of connections in the pool are being re-used.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With