I have an IAsyncEnumerable<string> stream that contains data downloaded from the web, and I want to save asynchronously each piece of data in a SQL database. So I used the ForEachAwaitAsync extension method from the System.Linq.Async library. My problem is that downloading and saving each piece of data is happening sequentially, while I would prefer if it happened concurrently.
To clarify, I don't want to download more than one pieces of data at the same time, neither I want to save more than one pieces of data at the same time. What I want is that while I am saving a piece of data in the database, the next piece of data should be concurrently downloaded from the web.
Below is a minimal (contrived) example of my current solution. Five items are downloaded and then are saved in the database. Downloading each item takes 1 second, and saving it takes another 1 second:
async IAsyncEnumerable<string> GetDataFromWeb()
{
foreach (var item in Enumerable.Range(1, 5))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
await Task.Delay(1000); // Simulate an I/O-bound operation
yield return item.ToString();
}
}
var stopwatch = Stopwatch.StartNew();
await GetDataFromWeb().ForEachAwaitAsync(async item =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
await Task.Delay(1000); // Simulate an I/O-bound operation
});
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");
The code is working, but not in the way I want. The total duration is ~10 seconds, instead of the desirable ~6 seconds.
Actual undesirable output:
04:55:50.526 > Downloading #1
04:55:51.595 > Saving #1
04:55:52.598 > Downloading #2
04:55:53.609 > Saving #2
04:55:54.615 > Downloading #3
04:55:55.616 > Saving #3
04:55:56.617 > Downloading #4
04:55:57.619 > Saving #4
04:55:58.621 > Downloading #5
04:55:59.622 > Saving #5
Duration: 10,115 msec
Hypothetical desirable output:
04:55:50.000 > Downloading #1
04:55:51.000 > Saving #1
04:55:51.000 > Downloading #2
04:55:52.000 > Saving #2
04:55:52.000 > Downloading #3
04:55:53.000 > Saving #3
04:55:53.000 > Downloading #4
04:55:54.000 > Saving #4
04:55:54.000 > Downloading #5
04:55:55.000 > Saving #5
Duration: 6,000 msec
I am thinking about implementing a custom extension method named ForEachConcurrentAsync, having identical signature with the aforementioned ForEachAwaitAsync method, but with behavior that allows enumerating and acting on items to occur concurrently. Below is a stub of this method:
/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static Task ForEachConcurrentAsync<T>(
this IAsyncEnumerable<T> source,
Func<T, Task> action,
CancellationToken cancellationToken = default)
{
// What to do?
}
How could this functionality be implemented?
Additional requirements:
action parameter contains blocking code, but this is a responsibility of the caller to prevent).Clarifications:
In case saving the data takes longer than downloading them from the web, the method should not keep downloading more items in advance. Only one piece of data should be downloaded in advance at maximum, while the previous one is saved.
The IAsyncEnumerable<string> with the web data is the starting point of this problem. I don't want to change the generator method of the IAsyncEnumerable<string>. I want to act on its elements (by saving them into the database), while the enumerable is enumerated.
It sounds like you just need to keep track of the previous action's Task and await it before the next action Task.
public static async Task ForEachConcurrentAsync<T>(
this IAsyncEnumerable<T> source,
Func<T, Task> action,
CancellationToken cancellationToken = default)
{
Task previous = null;
try
{
await source.ForEachAwaitAsync(async item =>
{
if(previous != null)
{
await previous;
}
previous = action(item);
});
}
finally
{
if(previous != null)
{
await previous;
}
}
}
All that's left is to sprinkle in the cancellation code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With