Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit the Maximum number of parallel tasks in c#

I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items var messages = new List<string>();  foreach (var msg in messages) {    Task.Factory.StartNew(() =>    {     Process(msg);    });  } 

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

How to ensure this message get processed in the same sequence/order of the Collection?

like image 572
Mathiyazhagan Avatar asked Apr 12 '16 05:04

Mathiyazhagan


People also ask

How do you wait for parallel foreach to complete?

You don't have to do anything special, Parallel. Foreach() will wait until all its branched tasks are complete. From the calling thread you can treat it as a single synchronous statement and for instance wrap it inside a try/catch.

How many tasks is too many C#?

The general answer is "Measure, Measure, Measure" :) if you're not experiencing any problems with performance, you shouldn't start optimizing. I'd say 200 tasks are fine though.

How do you parallelize tasks?

Parallel tasks are split into subtasks that are assigned to multiple workers and then completed simultaneously. A worker system can carry out both parallel and concurrent tasks by working on multiple tasks at the same time while also breaking down each task into sub-tasks that are executed simultaneously.

Do async tasks run in parallel?

There is no parallelism here, as the “async Task” does not automatically make something run in in parallel. This will spawn 2 threads, run them simultaneously, and return when both threads are done. This will create a list of Tasks to be run at the same time.


2 Answers

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10; var messages = new List<string>(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) {     List<Task> tasks = new List<Task>();     foreach(var msg in messages)     {         concurrencySemaphore.Wait();          var t = Task.Factory.StartNew(() =>         {             try             {                  Process(msg);             }             finally             {                 concurrencySemaphore.Release();             }         });          tasks.Add(t);     }      Task.WaitAll(tasks.ToArray()); } 

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args) {     int maxConcurrency = 5;     List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();      using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))     {         List<Task> tasks = new List<Task>();         foreach (var msg in messages)         {             concurrencySemaphore.Wait();              var t = Task.Factory.StartNew(() =>             {                 try                 {                     Process(msg);                 }                 finally                 {                     concurrencySemaphore.Release();                 }             });              tasks.Add(t);         }         // Task.WaitAll(tasks.ToArray());     }     Console.WriteLine("Exited using block");     Console.ReadKey(); }  private static void Process(string msg) {                 Thread.Sleep(2000);     Console.WriteLine(msg); } 
like image 30
ClearLogic Avatar answered Sep 23 '22 03:09

ClearLogic


You could use Parallel.Foreach and rely on MaxDegreeOfParallelism instead.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, msg => {      // logic      Process(msg); }); 
like image 62
Hari Prasad Avatar answered Sep 20 '22 03:09

Hari Prasad