I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.
//Assume this messages collection contains 1000 items var messages = new List<string>(); foreach (var msg in messages) { Task.Factory.StartNew(() => { Process(msg); }); }
Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?
How to ensure this message get processed in the same sequence/order of the Collection?
You don't have to do anything special, Parallel. Foreach() will wait until all its branched tasks are complete. From the calling thread you can treat it as a single synchronous statement and for instance wrap it inside a try/catch.
The general answer is "Measure, Measure, Measure" :) if you're not experiencing any problems with performance, you shouldn't start optimizing. I'd say 200 tasks are fine though.
Parallel tasks are split into subtasks that are assigned to multiple workers and then completed simultaneously. A worker system can carry out both parallel and concurrent tasks by working on multiple tasks at the same time while also breaking down each task into sub-tasks that are executed simultaneously.
There is no parallelism here, as the “async Task” does not automatically make something run in in parallel. This will spawn 2 threads, run them simultaneously, and return when both threads are done. This will create a list of Tasks to be run at the same time.
SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.
Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release()
method is called so before exiting the using block must wait for the completion of all created Tasks.
int maxConcurrency=10; var messages = new List<string>(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List<Task> tasks = new List<Task>(); foreach(var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } Task.WaitAll(tasks.ToArray()); }
Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll
Run below code in console app and this exception will be raised.
System.ObjectDisposedException: 'The semaphore has been disposed.'
static void Main(string[] args) { int maxConcurrency = 5; List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList(); using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List<Task> tasks = new List<Task>(); foreach (var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } // Task.WaitAll(tasks.ToArray()); } Console.WriteLine("Exited using block"); Console.ReadKey(); } private static void Process(string msg) { Thread.Sleep(2000); Console.WriteLine(msg); }
You could use Parallel.Foreach
and rely on MaxDegreeOfParallelism
instead.
Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, msg => { // logic Process(msg); });
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With