Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to limit the number of active Tasks running via the Parallel Task Library

Consider a queue holding a lot of jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.

If I use something like this

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<

I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelism because I can't use Parallel.Invoke or Parallel.ForEach

3 alternatives I've found

  1. Replace Task.Factory.StartNew with

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    Which seems to somewhat solve the problem but I am not clear exactly what this is doing and if this is the best method.

  2. Create a custom task scheduler that limits the degree of concurrency

  3. Use something like BlockingCollection to add jobs to collection when started and remove when finished to limit number that can be running.

With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.

Have I understood this correctly - which is the better way, or is there another way?

EDIT - This is what I've come up with from the answers below, producer-consumer pattern.

As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}
like image 769
Ryan Avatar asked Jun 21 '12 13:06

Ryan


People also ask

What does it mean to do tasks in parallel?

Parallel tasks are split into subtasks that are assigned to multiple workers and then completed simultaneously. A worker system can carry out both parallel and concurrent tasks by working on multiple tasks at the same time while also breaking down each task into sub-tasks that are executed simultaneously.

How many tasks is too many C#?

The general answer is "Measure, Measure, Measure" :) if you're not experiencing any problems with performance, you shouldn't start optimizing. I'd say 200 tasks are fine though.

Is Task run parallel C#?

Task parallelism is the process of running tasks in parallel. Task parallelism divides tasks and allocates those tasks to separate threads for processing.


2 Answers

I just gave an answer which is very applicable to this question.

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

You can't put unreliable systems into production. For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.

like image 99
usr Avatar answered Oct 21 '22 12:10

usr


Potential flow splits and continuations caused by await, later on in your code or in a 3rd party library, won't play nicely with long running tasks (or threads), so don't bother using long running tasks. In the async/await world, they're useless. More details here.

You can call ThreadPool.SetMaxThreads but before you make this call, make sure you set the minimum number of threads with ThreadPool.SetMinThreads, using values below or equal to the max ones. And by the way, the MSDN documentation is wrong. You CAN go below the number of cores on your machine with those method calls, at least in .NET 4.5 and 4.6 where I used this technique to reduce the processing power of a memory limited 32 bit service.

If however you don't wish to restrict the whole app but just the processing part of it, a custom task scheduler will do the job. A long time ago, MS released samples with several custom task schedulers, including a LimitedConcurrencyLevelTaskScheduler. Spawn the main processing task manually with Task.Factory.StartNew, providing the custom task scheduler, and every other task spawned by it will use it, including async/await and even Task.Yield, used for achieving asynchronousy early on in an async method.

But for your particular case, both solutions won't stop exhausting your queue of jobs before completing them. That might not be desirable, depending on the implementation and purpose of that queue of yours. They are more like "fire a bunch of tasks and let the scheduler find the time to execute them" type of solutions. So perhaps something a bit more appropriate here could be a stricter method of control over the execution of the jobs via semaphores. The code would look like this:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

There's more than one way to skin a cat. Use what you believe is appropriate.

like image 12
MoonStom Avatar answered Oct 21 '22 13:10

MoonStom