Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to query two IAsyncEnumerables asynchronously

I have two methods connected to two different sources of Foos which return two IAsyncEnumerable<Foo>. I need to fetch all Foos from both sources before being able to process them .

Problem : I would like to query both sources simultaneously (asynchronously), ie. not waiting for Source1 to complete the enumeration before starting to enumerate Source2. From my understanding, this is what happens into the method SequentialSourcesQuery example below, am I right?

With regular tasks, I would just start the first Task, then the second one, and call a await Task.WhenAll. But I am a bit confused on how to handle IAsyncEnumerable.

public class FoosAsync
{
    public async IAsyncEnumerable<Foo> Source1() { }

    public async IAsyncEnumerable<Foo> Source2() { }

    public async Task<List<Foo>> SequentialSourcesQuery()
    {
        List<Foo> foos = new List<Foo>();

        await foreach (Foo foo1 in Source1())
        {
            foos.Add(foo1);
        }

        await foreach (Foo foo2 in Source2())
        { //doesn't start until Source1 completed the enumeration? 
            foos.Add(foo2);
        }

        return foos;
    }
}
like image 402
XavierAM Avatar asked Jan 24 '23 11:01

XavierAM


1 Answers

You could take advantage of the libraries System.Linq.Async and System.Interactive.Async (owned by the RxTeam who are part of the .NET Foundation). They contain operators like Merge and ToListAsync that could solve your problem easily.

// Merges elements from all of the specified async-enumerable sequences
// into a single async-enumerable sequence.
public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

// Creates a list from an async-enumerable sequence.
public static ValueTask<List<TSource>> ToListAsync<TSource>(
    this IAsyncEnumerable<TSource> source,
    CancellationToken cancellationToken = default);

Putting everything together:

public Task<List<Foo>> SequentialSourcesQuery()
{
    return AsyncEnumerableEx.Merge(Source1(), Source2()).ToListAsync().AsTask();
}

By aware that these libraries have a focus on providing a rich set of features, and not on performance or efficiency. So if top-notch performance is important for your use case, niki.kante's solution will most probably outperform the above operator-based approach.

like image 179
Theodor Zoulias Avatar answered Jan 27 '23 00:01

Theodor Zoulias