Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to await multiple IAsyncEnumerable

We have code like this:

var intList = new List<int>{1,2,3};
var asyncEnumerables = intList.Select(Foo);

private async IAsyncEnumerable<int> Foo(int a)
{
  while (true)
  {
    await Task.Delay(5000);
    yield return a;
  } 
}

I need to start await foreach for every asyncEnumerable's entry. Every loop iteration should wait each other, and when every iteration is done i need to collect every iteration's data and process that by another method.

Can i somehow achieve that by TPL? Otherwise, couldn't you give me some ideas?

like image 547
Vladislav Shirshakov Avatar asked Jan 25 '23 11:01

Vladislav Shirshakov


2 Answers

What works for me is the Zip function in this repo (81 line)

I'm using it like this

var intList = new List<int> { 1, 2, 3 };
var asyncEnumerables = intList.Select(RunAsyncIterations);
var enumerableToIterate = async_enumerable_dotnet.AsyncEnumerable.Zip(s => s, asyncEnumerables.ToArray());

await foreach (int[] enumerablesConcatenation in enumerableToIterate)
{
    Console.WriteLine(enumerablesConcatenation.Sum()); //Sum returns 6
    await Task.Delay(2000);
}

static async IAsyncEnumerable<int> RunAsyncIterations(int i)
{
    while (true)
        yield return i;
}
like image 128
Vladislav Shirshakov Avatar answered Jan 28 '23 15:01

Vladislav Shirshakov


Here is a generic method Zip you could use, implemented as an iterator. The cancellationToken is decorated with the EnumeratorCancellation attribute, so that the resulting IAsyncEnumerable is WithCancellation friendly.

using System.Runtime.CompilerServices;

public static async IAsyncEnumerable<TSource[]> Zip<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    var enumerators = sources
        .Select(x => x.GetAsyncEnumerator(cancellationToken))
        .ToArray();
    try
    {
        while (true)
        {
            var array = new TSource[enumerators.Length];
            for (int i = 0; i < enumerators.Length; i++)
            {
                if (!await enumerators[i].MoveNextAsync()) yield break;
                array[i] = enumerators[i].Current;
            }
            yield return array;
        }
    }
    finally
    {
        foreach (var enumerator in enumerators)
        {
            await enumerator.DisposeAsync();
        }
    }
}

Usage example:

await foreach (int[] result in Zip(asyncEnumerables))
{
    Console.WriteLine($"Result: {String.Join(", ", result)}");
}
like image 34
Theodor Zoulias Avatar answered Jan 28 '23 13:01

Theodor Zoulias