Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttling asynchronous tasks

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

I also want to utilize a given number of threads if possible.

I came up with an extension method, ThrottleTasksAsync that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.

Usage:

class Program {     static void Main(string[] args)     {         Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();          Console.WriteLine("Press a key to exit...");         Console.ReadKey(true);     } } 

Here is the code:

static class IEnumerableExtensions {     public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)     {         var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());          var semaphore = new SemaphoreSlim(maxConcurrentTasks);          // Run the throttler on a separate thread.         var t = Task.Run(() =>         {             foreach (var item in enumerable)             {                 // Wait for the semaphore                 semaphore.Wait();                 blockingQueue.Add(item);             }              blockingQueue.CompleteAdding();         });          var taskList = new List<Task<Result_T>>();          Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },         _ =>         {             Enumerable_T item;              if (blockingQueue.TryTake(out item, 100))             {                 taskList.Add(                     // Run the task                     taskToRun(item)                     .ContinueWith(tsk =>                         {                             // For effect                             Thread.Sleep(2000);                              // Release the semaphore                             semaphore.Release();                              return tsk.Result;                         }                     )                 );             }         });          // Await all the tasks.         return await Task.WhenAll(taskList);     }      static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)     {         while (!condition()) yield return true;     } } 

The method utilizes BlockingCollection and SemaphoreSlim to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach loop re-purposed as a while loop.

The old version was:

foreach (var master = ...) {     var details = ...;     Parallel.ForEach(details, detail => {         // Process each detail record here     }, new ParallelOptions { MaxDegreeOfParallelism = 15 });     // Perform the final batch updates here } 

But, the thread pool gets exhausted fast, and you can't do async/await.

Bonus: To get around the problem in BlockingCollection where an exception is thrown in Take() when CompleteAdding() is called, I'm using the TryTake overload with a timeout. If I didn't use the timeout in TryTake, it would defeat the purpose of using a BlockingCollection since TryTake won't block. Is there a better way? Ideally, there would be a TakeAsync method.

like image 556
Josh Wyant Avatar asked Mar 18 '14 22:03

Josh Wyant


1 Answers

As suggested, use TPL Dataflow.

A TransformBlock<TInput, TOutput> may be what you're looking for.

You define a MaxDegreeOfParallelism to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.

var downloader = new TransformBlock<string, HttpResponse>(         url => Download(url),         new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }     );  var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer);  foreach(var url in urls)     downloader.Post(url);     //or await downloader.SendAsync(url);  downloader.Complete(); await downloader.Completion;  IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) {     //process responses } 

Note: The TransformBlock buffers both its input and output. Why, then, do we need to link it to a BufferBlock?

Because the TransformBlock won't complete until all items (HttpResponse) have been consumed, and await downloader.Completion would hang. Instead, we let the downloader forward all its output to a dedicated buffer block - then we wait for the downloader to complete, and inspect the buffer block.

like image 186
dcastro Avatar answered Sep 18 '22 12:09

dcastro