Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing only n items at a time concurrently using Task Parallel Library

This is all happening in a windows service.

I have a Queue<T> (actually a ConcurrentQueue<T>) holding items waiting to be processed. But, I don't want to process only one at a time, I want to process n items concurrently, where n is a configurable integer.

How do I go about doing this using the Task Parallel Library?

I know that TPL will partition collections on behalf of the developer for concurrent processing, but not sure if that's the feature that I'm after. I'm new to multithreading and TPL.

like image 571
Ronnie Overby Avatar asked Aug 04 '11 18:08

Ronnie Overby


1 Answers

Here is one idea that involves creating an extension method for TaskFactory.

public static class TaskFactoryExtension
{
    public static Task StartNew(this TaskFactory target, Action action, int parallelism)
    {
        var tasks = new Task[parallelism];
        for (int i = 0; i < parallelism; i++)
        {
            tasks[i] = target.StartNew(action);
        }
        return target.StartNew(() => Task.WaitAll(tasks));
    }
}

Then your calling code would look like the following.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
  () =>
  {
    T item;
    while (queue.TryDequeue(out item))
    {
      ProcessItem(item);
    }
  }, n);
task.Wait(); // Optionally wait for everything to finish.

Here is another idea using Parallel.ForEach. The problem with this approach is that your degrees of parallelism might not necessarily be honored. You are only indicating the maximum amount allowed and not the absolute amount.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
  (item) =>
  {
    ProcessItem(item);    
  });
like image 72
Brian Gideon Avatar answered Oct 07 '22 23:10

Brian Gideon