Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to enumerate an IAsyncEnumerable<T> and invoke an async action for each element, allowing concurrency for each iteration/action pair?

I have an IAsyncEnumerable<string> stream that contains data downloaded from the web, and I want to save asynchronously each piece of data in a SQL database. So I used the ForEachAwaitAsync extension method from the System.Linq.Async library. My problem is that downloading and saving each piece of data is happening sequentially, while I would prefer if it happened concurrently.

To clarify, I don't want to download more than one pieces of data at the same time, neither I want to save more than one pieces of data at the same time. What I want is that while I am saving a piece of data in the database, the next piece of data should be concurrently downloaded from the web.

Below is a minimal (contrived) example of my current solution. Five items are downloaded and then are saved in the database. Downloading each item takes 1 second, and saving it takes another 1 second:

async IAsyncEnumerable<string> GetDataFromWeb()
{
    foreach (var item in Enumerable.Range(1, 5))
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Downloading #{item}");
        await Task.Delay(1000); // Simulate an I/O-bound operation
        yield return item.ToString();
    }
}

var stopwatch = Stopwatch.StartNew();
await GetDataFromWeb().ForEachAwaitAsync(async item =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Saving #{item}");
    await Task.Delay(1000); // Simulate an I/O-bound operation
});
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

The code is working, but not in the way I want. The total duration is ~10 seconds, instead of the desirable ~6 seconds.

Actual undesirable output:

04:55:50.526 > Downloading #1
04:55:51.595 > Saving #1
04:55:52.598 > Downloading #2
04:55:53.609 > Saving #2
04:55:54.615 > Downloading #3
04:55:55.616 > Saving #3
04:55:56.617 > Downloading #4
04:55:57.619 > Saving #4
04:55:58.621 > Downloading #5
04:55:59.622 > Saving #5
Duration: 10,115 msec

Hypothetical desirable output:

04:55:50.000 > Downloading #1
04:55:51.000 > Saving #1
04:55:51.000 > Downloading #2
04:55:52.000 > Saving #2
04:55:52.000 > Downloading #3
04:55:53.000 > Saving #3
04:55:53.000 > Downloading #4
04:55:54.000 > Saving #4
04:55:54.000 > Downloading #5
04:55:55.000 > Saving #5
Duration: 6,000 msec

I am thinking about implementing a custom extension method named ForEachConcurrentAsync, having identical signature with the aforementioned ForEachAwaitAsync method, but with behavior that allows enumerating and acting on items to occur concurrently. Below is a stub of this method:

/// <summary>
/// Invokes and awaits an asynchronous action on each element in the source sequence.
/// Each action is awaited concurrently with fetching the sequence's next element.
/// </summary>
public static Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    // What to do?
}

How could this functionality be implemented?

Additional requirements:

  1. Leaking running tasks in case of cancellation or failure is not acceptable. All started tasks should be completed when the method completes.
  2. In the extreme case that both the enumeration and an action fails, only one of the two exceptions should be propagated, and either one is OK.
  3. The method should be genuinely asynchronous, and should not block the current thread (unless the action parameter contains blocking code, but this is a responsibility of the caller to prevent).

Clarifications:

  1. In case saving the data takes longer than downloading them from the web, the method should not keep downloading more items in advance. Only one piece of data should be downloaded in advance at maximum, while the previous one is saved.

  2. The IAsyncEnumerable<string> with the web data is the starting point of this problem. I don't want to change the generator method of the IAsyncEnumerable<string>. I want to act on its elements (by saving them into the database), while the enumerable is enumerated.

like image 253
Theodor Zoulias Avatar asked Nov 02 '25 08:11

Theodor Zoulias


1 Answers

It sounds like you just need to keep track of the previous action's Task and await it before the next action Task.

public static async Task ForEachConcurrentAsync<T>(
    this IAsyncEnumerable<T> source,
    Func<T, Task> action,
    CancellationToken cancellationToken = default)
{
    Task previous = null;
    try
    {
        await source.ForEachAwaitAsync(async item =>
        {
            if(previous != null)
            {
                await previous;
            }

            previous = action(item);
        });
    }
    finally
    {
        if(previous != null)
        {
            await previous;
        }
    }
}

All that's left is to sprinkle in the cancellation code.

like image 137
juharr Avatar answered Nov 03 '25 21:11

juharr



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!