Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallelize yield return inside C#8 IAsyncEnumerable<T>

I have a method that returns an async enumerator

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        foreach (var item in ListOfWorkItems)
        {
            yield return DoWork(item);
        }
    }

And the caller:

    public async Task LogResultsAsync()
    {
        await foreach (var result in DoWorkAsync())
        {
            Console.WriteLine(result);
        }
    }

Because DoWork is an expensive operation, I'd prefer to somehow parallelize it, so it works similar to:

    public async IAsyncEnumerable<IResult> DoWorkAsync()
    {
        await Something();
        Parallel.ForEach(ListOfWorkItems, item =>
        {
            yield return DoWork(item);
        });
    }

However I can't do yield return from inside Parallel.Foreach so just wonder what's the best way to go about this?

The order of returned results doesn't matter.

Thanks.

Edit: Sorry I left out some code in DoWorkAsync, it was indeed awaiting on something I just didn't put it in the code above because that's not very relevent to the question. Updated now

Edit2: DoWork is mostly I/O bound in my case, it's reading data from a database.

like image 400
Godsent Avatar asked Jun 25 '26 23:06

Godsent


2 Answers

Here is a basic implementation that uses a TransformBlock frοm the TPL Dataflow library:

public async IAsyncEnumerable<IResult> GetResults(List<IWorkItem> workItems)
{
    // Define the dataflow block
    var block = new TransformBlock<IWorkItem, IResult>(async item =>
    {
        return await TransformAsync(item);
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 10, // the default is 1
        EnsureOrdered = false // the default is true
    });

    // Feed the block with input data
    foreach (var item in workItems)
    {
        block.Post(item);
    }
    block.Complete();

    // Stream the block's output as IAsyncEnumerable
    while (await block.OutputAvailableAsync())
    {
        while (block.TryReceive(out var result))
        {
            yield return result;
        }
    }

    // Propagate the first exception, if any.
    await block.Completion;
}

This implementation is not perfect because in case the consumer of the IAsyncEnumerable abandons the enumeration prematurely, the TransformBlock will continue working in the background until all work items have been processed. Also it doesn't support cancellation, which all respectable IAsyncEnumerable producing methods should support. These missing features could be added relatively easily. If you are interested at adding them, look at this question.

Another imperfection is that in case the await TransformAsync(item) throws an OperationCanceledException, this error is suppressed. This is the by design behavior of TPL Dataflow.


.NET 6 update: A new API DataflowBlock.ReceiveAllAsync has been introduced in .NET 6, that can simplify the streaming of the block's output. There is a gotcha though. See this answer for details.

like image 56
Theodor Zoulias Avatar answered Jun 28 '26 13:06

Theodor Zoulias


As suggested by canton7, you could use AsParallel instead of the Parallel.ForEach.

This can be consumed inside a standard foreach loop where you can yield the results:

public async IAsyncEnumerable<IResult> DoWorkAsync()
{
    await Something();
    foreach (var result in ListOfWorkItems.AsParallel().Select(DoWork))
    {
        yield return result;
    }
}

As mentioned by Theodor Zoulias, the enumerable returned isn't actually asynchronous at all.

If you simply need to consume this using await foreach this shouldn't be a problem, but to be more explicit, you could return the IEnumerable and have the caller parallelise it:

public async Task<IEnumerable<Item>> DoWorkAsync()
{
    await Something();
    return ListOfWorkItems;
}

// Caller...
Parallel.ForEach(await DoWorkAsync(), item => 
{
    var result = DoWork(item);
    //...
});

Although this may be less maintainable if it need to be called in multiple places

like image 45
Johnathan Barclay Avatar answered Jun 28 '26 11:06

Johnathan Barclay