Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement an efficient WhenEach that streams an IAsyncEnumerable of task results?

I am trying to update my toolset with the new tools offered by C# 8, and one method that seems particularly useful is a version of Task.WhenAll that returns an IAsyncEnumerable. This method should stream the task results as soon as they become available, so naming it WhenAll doesn't make much sense. WhenEach sounds more appropriate. The signature of the method is:

public static IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks);

This method could be used like this:

var tasks = new Task<int>[]
{
    ProcessAsync(1, 300),
    ProcessAsync(2, 500),
    ProcessAsync(3, 400),
    ProcessAsync(4, 200),
    ProcessAsync(5, 100),
};

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

static async Task<int> ProcessAsync(int result, int delay)
{
    await Task.Delay(delay);
    return result;
}

Expected output:

Processed: 5
Processed: 4
Processed: 1
Processed: 3
Processed: 2

I managed to write a basic implementation using the method Task.WhenAny in a loop, but there is a problem with this approach:

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(
    Task<TResult>[] tasks)
{
    var hashSet = new HashSet<Task<TResult>>(tasks);
    while (hashSet.Count > 0)
    {
        var task = await Task.WhenAny(hashSet).ConfigureAwait(false);
        yield return await task.ConfigureAwait(false);
        hashSet.Remove(task);
    }
}

The problem is the performance. The implementation of the Task.WhenAny creates a defensive copy of the supplied list of tasks, so calling it repeatedly in a loop results in O(n²) computational complexity. My naive implementation struggles to process 10,000 tasks. The overhead is nearly 10 sec in my machine. I would like the method to be nearly as performant as the build-in Task.WhenAll, that can handle hundreds of thousands of tasks with ease. How could I improve the WhenEach method to make it performs decently?

like image 592
Theodor Zoulias Avatar asked Oct 02 '19 01:10

Theodor Zoulias


People also ask

How IAsyncEnumerable works?

IAsyncEnumerable<T> exposes an enumerator that has a MoveNextAsync() method that can be awaited. This means a method that produces this result can make asynchronous calls in between yielding results. Cool! This method can now yield data asynchronously.

What is Task yield?

With await Task. Yield() , you force it to be asynchronous in a way that the subsequent code is still run on the current context (just at a later point in time).

What is ValueTask C#?

From the time the Task Parallel Library (TPL) was introduced, developers have been using the Task class to write asynchronous code. The ValueTask struct is a new type introduced in C# 7.0 that provides a way to work with time-sensitive tasks with less resource consumption (memory, CPU, etc.) overhead.

Is yield return Async?

Using an async yield return statement requires that the method be asynchronous, making use of async/await. Usually an async method will return a task. Your first thought when using yield return in your async method may be to have the method return Task of IEnumerable.


2 Answers

By using code from this article, you can implement the following:

public static Task<Task<T>>[] Interleaved<T>(IEnumerable<Task<T>> tasks)
{
   var inputTasks = tasks.ToList();

   var buckets = new TaskCompletionSource<Task<T>>[inputTasks.Count];
   var results = new Task<Task<T>>[buckets.Length];
   for (int i = 0; i < buckets.Length; i++)
   {
       buckets[i] = new TaskCompletionSource<Task<T>>();
       results[i] = buckets[i].Task;
   }

   int nextTaskIndex = -1;
   Action<Task<T>> continuation = completed =>
   {
       var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
       bucket.TrySetResult(completed);
   };

   foreach (var inputTask in inputTasks)
       inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

   return results;
}

Then change your WhenEach to call the Interleaved code

public static async IAsyncEnumerable<TResult> WhenEach<TResult>(Task<TResult>[] tasks)
{
    foreach (var bucket in Interleaved(tasks))
    {
        var t = await bucket;
        yield return await t;
    }
}

Then you can call your WhenEach as per usual

await foreach (int result in WhenEach(tasks))
{
    Console.WriteLine($"Processed: {result}");
}

I did some rudimentary benchmarking with 10k tasks and performed 5 times better in terms of speed.

like image 118
JohanP Avatar answered Sep 18 '22 20:09

JohanP


You can use a Channel as an async queue. Each task can write to the channel when it completes. Items in the channel will be returned as an IAsyncEnumerable through ChannelReader.ReadAllAsync.

IAsyncEnumerable<T> ToAsyncEnumerable<T>(IEnumerable<Task<T>> inputTasks)
{
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    var continuations=inputTasks.Select(t=>t.ContinueWith(x=>
                                           writer.TryWrite(x.Result)));
    _ = Task.WhenAll(continuations)
            .ContinueWith(t=>writer.Complete(t.Exception));

    return channel.Reader.ReadAllAsync();
}

When all tasks complete writer.Complete() is called to close the channel.

To test this, this code produces tasks with decreasing delays. This should return the indexes in reverse order :

var tasks=Enumerable.Range(1,4)
                    .Select(async i=>
                    { 
                      await Task.Delay(300*(5-i));
                      return i;
                    });

await foreach(var i in Interleave(tasks))
{
     Console.WriteLine(i);

}

Produces :

4
3
2
1
like image 39
Panagiotis Kanavos Avatar answered Sep 17 '22 20:09

Panagiotis Kanavos