Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it ok to await a collection of 1000 tasks in a .NET app? Or should batching be used?

I have a simple console app testing out some code.

My code has a list of 1000 numbers and putting each number/int onto an Azure Queue.

Now, I do this asyncronously and it's works great. here's my code from my library:

var tasks = stagedFileIds.Select(stagedFileId => QueueFileToProcessAsync(stagedFileId));
await Task.WhenAll(tasks)
          .ConfigureAwait(false);

Works great.

But .. is this a bad thing? should I batch this into 50's or 25's or something? But most importantly ... batch it?

What is 'the cost' of doing the above code?

Remember, this is a console app right now. I'm going to move this to an Azure Function at some point.

like image 801
Pure.Krome Avatar asked Oct 30 '22 09:10

Pure.Krome


1 Answers

You should throttle them in a asynchronous way, to make sure you are not making too many QueueFileToProcessAsync operations in parallel, unless you are sure it is harmless. I recommend you Stephen Cleary introduction to TPL Dataflow, where part 3 and his other post Async Producer/Consumer Queue using Dataflow addresses throttling.

If you are calling and endpoint you will be throttled probably by the ServicePointManager.DefaultConnectionLimit as @Gerino pointed out.

Just for the craic, if you had to implement this yourself without TPL Dataflow, you can do it with the .NET Concurrent Collections:

// prototype code
static class TaskThrottlingExtension
{
    public static async Task ThrottleProcessingAsync<T>(this IEnumerable<T> inputs, int parallel, Func<T, Task> process)
    {
        var queues = new BlockingCollection<T>[parallel];
        var tasks = new Task[parallel];
        for (int i = 0; i < parallel; i++)
        {
            var queue = queues[i] = new BlockingCollection<T>(1);
            tasks[i] = Task.Run( async () =>
            {
                foreach (var input in queue.GetConsumingEnumerable())
                {
                    await process(input).ConfigureAwait(false);
                }
            });
        }

        try
        {
            foreach (var input in inputs)
            {
                BlockingCollection<T>.AddToAny(queues, input);
            }

            foreach (var queue in queues)
            {
                queue.CompleteAdding();
            }

            await Task.WhenAll(tasks).ConfigureAwait(false);
        }
        finally
        {
            foreach (var queue in queues)
            {
                queue.Dispose();
            }
        }
    }
}
like image 117
vtortola Avatar answered Nov 09 '22 13:11

vtortola