We use IEnumerables to return huge datasets from database:
public IEnumerable<Data> Read(...) { using(var connection = new SqlConnection(...)) { // ... while(reader.Read()) { // ... yield return item; } } }
Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:
public async Task<List<Data>> ReadAsync(...) { var result = new List<Data>(); using(var connection = new SqlConnection(...)) { // ... while(await reader.ReadAsync().ConfigureAwait(false)) { // ... result.Add(item); } } return result; }
This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.
An asynchronous data stream is a stream of data where values are emitted, one after another, with a delay between them. The word asynchronous means that the data emitted can appear anywhere in time, after one second or even after two minutes, for example.
C# Language Async-Await Async/await will only improve performance if it allows the machine to do additional work.
Yes, it is a best practice to make your API Endpoints asynchronous in according to this Microsoft's article. An asynchronous operations allows to "optimize" amount of threads involved in request handling by involving the same thread to process another request, while the previous is under await (more details here).
The main benefits of asynchronous programming using async / await include the following: Increase the performance and responsiveness of your application, particularly when you have long-running operations that do not require to block the execution.
The easiest option is using TPL Dataflow
. All you need to do is configure an ActionBlock
that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity
which will throttle the reader reading from the database when the processing can't handle the speed.
var block = new ActionBlock<Data>( data => ProcessDataAsync(data), new ExecutionDataflowBlockOptions { BoundedCapacity = 1000, MaxDegreeOfParallelism = Environment.ProcessorCount }); using(var connection = new SqlConnection(...)) { // ... while(await reader.ReadAsync().ConfigureAwait(false)) { // ... await block.SendAsync(item); } }
You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.
Most of the time when dealing with async/await methods, I find it easier to turn the problem around, and use functions (Func<...>
) or actions (Action<...>
) instead of ad-hoc code, especially with IEnumerable
and yield
.
In other words, when I think "async", I try to forget the old concept of function "return value" that is otherwise so obvious and that we are so familiar with.
For example, if you change you initial sync code into this (processor
is the code that will ultimately do what you do with one Data item):
public void Read(..., Action<Data> processor) { using(var connection = new SqlConnection(...)) { // ... while(reader.Read()) { // ... processor(item); } } }
Then, the async version is quite simple to write:
public async Task ReadAsync(..., Action<Data> processor) { using(var connection = new SqlConnection(...)) { // note you can use connection.OpenAsync() // and command.ExecuteReaderAsync() here while(await reader.ReadAsync()) { // ... processor(item); } } }
If you can change your code this way, you don't need any extension or extra library or IAsyncEnumerable stuff.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With