Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# parallel foreach equally finishing tasks

I'm using C# Parallel.ForEach to process more than thousand subsets of data. One set takes 5-30 minutes to process, depending on size of the set. In my computer with option

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

I'll get 8 parallel processes. As I understood, processes are divided equally between parallel tasks (e.g. the first task gets jobs number 1,9,17 etc, the second gets 2,10,18 etc.); therefore, one task can finish own jobs sooner than others. Because those sets of data took less time than others.

The problem is that four parallel tasks finish their jobs within 24 hours, but the last one finish in 48 hours. It there some chance to organize parallelism so that all parallel tasks are finishing equally? It means all parallel tasks continue working until all jobs are done?

like image 297
Allan Avatar asked Mar 06 '13 15:03

Allan


3 Answers

Since the jobs are not equal, you can't split the number of jobs between processors and have them finish at about the same time. I think what you need here is 8 worker threads that retrieve the next job in line. You will have to use a lock on the function to get the next job.

Somebody correct me if I'm wrong, but off the top of my head... a worker thread could be given a function like this:

public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

And the function to get the next job would look like:

private List<Job> jobs;
private int currentJob = 0;

private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}
like image 188
Matt Miller Avatar answered Sep 18 '22 14:09

Matt Miller


It seems that there is no ready-to-use solution and it has to be created.

My previous code was:

var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

To share work equally between all CPU-s, I still use Parallel to do the work, but instead of ForEach I use For and an idea from Matt. The new code is:

Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

So, thank you for your advice.

like image 26
Allan Avatar answered Sep 19 '22 14:09

Allan


One option, as suggested by others, is to manage your own producer consumer queue. I'd like to note that using the BlockingCollection makes this very easy to do.

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();

//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);

queue.CompleteAdding();

for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}
like image 28
Servy Avatar answered Sep 19 '22 14:09

Servy