Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async with huge data streams

We use IEnumerables to return huge datasets from database:

public IEnumerable<Data> Read(...) {     using(var connection = new SqlConnection(...))     {         // ...         while(reader.Read())         {             // ...             yield return item;         }     } } 

Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:

public async Task<List<Data>> ReadAsync(...) {     var result = new List<Data>();     using(var connection = new SqlConnection(...))     {         // ...         while(await reader.ReadAsync().ConfigureAwait(false))         {             // ...             result.Add(item);         }     }     return result; } 

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

like image 368
user1224129 Avatar asked Jul 25 '14 23:07

user1224129


People also ask

What is asynchronous data streams?

An asynchronous data stream is a stream of data where values are emitted, one after another, with a delay between them. The word asynchronous means that the data emitted can appear anywhere in time, after one second or even after two minutes, for example.

Does async-await improve performance?

C# Language Async-Await Async/await will only improve performance if it allows the machine to do additional work.

Should I use async for API?

Yes, it is a best practice to make your API Endpoints asynchronous in according to this Microsoft's article. An asynchronous operations allows to "optimize" amount of threads involved in request handling by involving the same thread to process another request, while the previous is under await (more details here).

Is it good to use async?

The main benefits of asynchronous programming using async / await include the following: Increase the performance and responsiveness of your application, particularly when you have long-running operations that do not require to block the execution.


2 Answers

The easiest option is using TPL Dataflow. All you need to do is configure an ActionBlock that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity which will throttle the reader reading from the database when the processing can't handle the speed.

var block = new ActionBlock<Data>(     data => ProcessDataAsync(data),     new ExecutionDataflowBlockOptions     {         BoundedCapacity = 1000,         MaxDegreeOfParallelism = Environment.ProcessorCount     });  using(var connection = new SqlConnection(...)) {     // ...     while(await reader.ReadAsync().ConfigureAwait(false))     {         // ...        await block.SendAsync(item);     } } 

You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.

like image 64
i3arnon Avatar answered Sep 22 '22 15:09

i3arnon


Most of the time when dealing with async/await methods, I find it easier to turn the problem around, and use functions (Func<...>) or actions (Action<...>) instead of ad-hoc code, especially with IEnumerable and yield.

In other words, when I think "async", I try to forget the old concept of function "return value" that is otherwise so obvious and that we are so familiar with.

For example, if you change you initial sync code into this (processor is the code that will ultimately do what you do with one Data item):

public void Read(..., Action<Data> processor) {     using(var connection = new SqlConnection(...))     {         // ...         while(reader.Read())         {             // ...             processor(item);         }     } } 

Then, the async version is quite simple to write:

public async Task ReadAsync(..., Action<Data> processor) {     using(var connection = new SqlConnection(...))     {         // note you can use connection.OpenAsync()         // and command.ExecuteReaderAsync() here         while(await reader.ReadAsync())         {             // ...             processor(item);         }     } } 

If you can change your code this way, you don't need any extension or extra library or IAsyncEnumerable stuff.

like image 44
Simon Mourier Avatar answered Sep 21 '22 15:09

Simon Mourier