Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to throttle multiple asynchronous tasks?

I have some code of the following form:

static async Task DoSomething(int n) 
{
  ...
}

static void RunThreads(int totalThreads, int throttle) 
{
  var tasks = new List<Task>();
  for (var n = 0; n < totalThreads; n++)
  {
    var task = DoSomething(n);
    tasks.Add(task);
  }
  Task.WhenAll(tasks).Wait(); // all threads must complete
}

Trouble is, if I don't throttle the threads, things start falling apart. Now, I want to launch a maximum of throttle threads, and only start the new thread when an old one is complete. I've tried a few approaches and none so far has worked. Problems I have encountered include:

  • The tasks collection must be fully populated with all tasks, whether active or awaiting execution, otherwise the final .Wait() call only looks at the threads that it started with.
  • Chaining the execution seems to require use of Task.Run() or the like. But I need a reference to each task from the outset, and instantiating a task seems to kick it off automatically, which is what I don't want.

How to do this?

like image 926
Shaul Behr Avatar asked Aug 17 '15 09:08

Shaul Behr


3 Answers

.NET 6 introduces Parallel.ForEachAsync. You could rewrite your code like this:

static async ValueTask DoSomething(int n)
{
    ...
}

static Task RunThreads(int totalThreads, int throttle)
    => Parallel.ForEachAsync(Enumerable.Range(0, totalThreads), new ParallelOptions() { MaxDegreeOfParallelism = throttle }, (i, _) => DoSomething(i));

Notes:

  • I had to change the return type of your DoSomething function from Task to ValueTask.
  • You probably want to avoid the .Wait() call, so I made the RunThreads method async.
  • It is not obvious from your example why you need access to the individual tasks. This code does not give you access to the tasks, but might still be helpful in many cases.
like image 75
Robert Hegner Avatar answered Sep 29 '22 17:09

Robert Hegner


First, abstract away from threads. Especially since your operation is asynchronous, you shouldn't be thinking about "threads" at all. In the asynchronous world, you have tasks, and you can have a huge number of tasks compared to threads.

Throttling asynchronous code can be done using SemaphoreSlim:

static async Task DoSomething(int n);

static void RunConcurrently(int total, int throttle) 
{
  var mutex = new SemaphoreSlim(throttle);
  var tasks = Enumerable.Range(0, total).Select(async item =>
  {
    await mutex.WaitAsync();
    try { await DoSomething(item); }
    finally { mutex.Release(); }
  });
  Task.WhenAll(tasks).Wait();
}
like image 23
Stephen Cleary Avatar answered Oct 07 '22 12:10

Stephen Cleary


The simplest option IMO is to use TPL Dataflow. You just create an ActionBLock, limit it by the desired parallelism and start posting items into it. It makes sure to only run a certain amount of tasks at the same time, and when a task completes, it starts executing the next item:

async Task RunAsync(int totalThreads, int throttle) 
{
    var block = new ActionBlock<int>(
        DoSomething,
        new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle });

    for (var n = 0; n < totalThreads; n++)
    {
        block.Post(n);
    }

    block.Complete();
    await block.Completion;
}
like image 16
i3arnon Avatar answered Oct 07 '22 12:10

i3arnon