I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.
Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.
I also want to utilize a given number of threads if possible.
I came up with an extension method, ThrottleTasksAsync
that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.
Usage:
class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } }
Here is the code:
static class IEnumerableExtensions { public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun) { var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task<Result_T>>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable<bool> IterateUntilTrue(Func<bool> condition) { while (!condition()) yield return true; } }
The method utilizes BlockingCollection
and SemaphoreSlim
to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach
loop re-purposed as a while
loop.
The old version was:
foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here }
But, the thread pool gets exhausted fast, and you can't do async
/await
.
Bonus: To get around the problem in BlockingCollection
where an exception is thrown in Take()
when CompleteAdding()
is called, I'm using the TryTake
overload with a timeout. If I didn't use the timeout in TryTake
, it would defeat the purpose of using a BlockingCollection
since TryTake
won't block. Is there a better way? Ideally, there would be a TakeAsync
method.
As suggested, use TPL Dataflow.
A TransformBlock<TInput, TOutput>
may be what you're looking for.
You define a MaxDegreeOfParallelism
to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.
var downloader = new TransformBlock<string, HttpResponse>( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) { //process responses }
Note: The TransformBlock
buffers both its input and output. Why, then, do we need to link it to a BufferBlock
?
Because the TransformBlock
won't complete until all items (HttpResponse
) have been consumed, and await downloader.Completion
would hang. Instead, we let the downloader
forward all its output to a dedicated buffer block - then we wait for the downloader
to complete, and inspect the buffer block.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With