Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

c# task multi-queue throttling

I need a environment which needs to maintain different task queues, and for each of them to have a well defined number of concurrent threads that can execute for each queue. Something like this:

  • Queue 1 -> 3 threads;
  • Queue 2 -> 6 threads;

Kind of Task system. I have managed to implement by myself this using plain old c# code (aka System.Threading.Thread, lock and queue) which works more than fine for 1+ year. However, I keep reading articles about the wonders of TaskFactory and TaskScheduler, about being possible this things with built-in classes in .NET, but I have failed to find an example to prove this. I would like to test it and to compare with what I have right now to see if it's working better and if it does, to replace it.

More, I can live without having to limit/set the number of parallel threads for each queue as long as I can get the guarantee that if an item targeted for queue #2 is executed imediatly even if queue #1 is executing on full load.

So, my question is - is there something in .net 4 and more, can someone point me to a sample? I am looking for one an entire week and failed to get something relevant.

like image 271
dcg Avatar asked Mar 25 '12 01:03

dcg


2 Answers

This is actually pretty trivial using the TPL and the new collections in System.Collections.Concurrent.

For your needs the BlockingCollection<T> is what I would recommend. By default it uses a ConcurrentQueue<T> as the underlying store which is perfect for what you want.

var queue = new BlockingCollection<Message>();

To set some code working on those messages, and control how many can execute in parallel is as simple as this:

//Set max parallel Tasks
var options = new ParallelOptions
{
    MaxDegreeOfParallelism = 10
};

Parallel.ForEach(queue.GetConsumingEnumerable(), options, msg =>
{
    //Do some stuff with this message
});

So what is going on here? Well...

The call to GetConsumingEnumerable() will actually block until there is something in queue to consume. This is great because no extra code is necessary for signaling that new work is ready to be done. Rather, as queue fills up, a new Task with your (anonymous) delegate will be kicked off with an item.

The ParallelOptions object allows you to control how Parallel.ForEach operates. In this case, you are telling it you never want more than 10 Tasks executing at any one time. It is important to note that Tasks != Threads. The details are murky, but needless to say there is a lot of optimization going on under the hood. It's all pluggable mind you, but that is not for the faint of heart.

There are obviously a lot of details I haven't covered here, but hopefully you can see how simple and expressive using the Task Parallel Library can be.

like image 122
Josh Avatar answered Sep 30 '22 18:09

Josh


You might try creating the equivalent of LimitedConcurrencyLevelTaskScheduler mentioned in this example or look into the priority scheduler mentioned here.

like image 28
Jeff Moser Avatar answered Sep 30 '22 16:09

Jeff Moser