Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Threads for Processing Many Tasks

I have a C# requirement for individually processing a 'great many' (perhaps > 100,000) records. Running this process sequentially is proving to be very slow with each record taking a good second or so to complete (with a timeout error set at 5 seconds).

I would like to try running these tasks asynchronously by using a set number of worker 'threads' (I use the term 'thread' here cautiously as I am not sure if I should be looking at a thread, or a task or something else).

I have looked at the ThreadPool, but I can't imagine it could queue the volume of requests required. My ideal pseudo code would look something like this...

public void ProcessRecords() {
    SetMaxNumberOfThreads(20);
    MyRecord rec;
    while ((rec = GetNextRecord()) != null) {
        var task = WaitForNextAvailableThreadFromPool(ProcessRecord(rec));
        task.Start()
    }
}

I will also need a mechanism that the processing method can report back to the parent/calling class.

Can anyone point me in the right direction with perhaps some example code?

like image 443
Neilski Avatar asked Jan 08 '15 13:01

Neilski


1 Answers

A possible simple solution would be to use a TPL Dataflow block which is a higher abstraction over the TPL with configurations for degree of parallelism and so forth. You simply create the block (ActionBlock in this case), Post everything to it, wait asynchronously for completion and TPL Dataflow handles all the rest for you:

var block = new ActionBlock<MyRecord>(
    rec => ProcessRecord(rec), 
    new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});

MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
     block.Post(rec);
}

block.Complete();
await block.Completion

Another benefit is that the block starts working as soon as the first record arrives and not only when all the records have been received.

If you need to report back on each record you can use a TransformBlock to do the actual processing and link an ActionBlock to it that does the updates:

var transform = new TransfromBlock<MyRecord, Report>(rec =>
{
    ProcessRecord(rec);
    return GenerateReport(rec);
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});

var reporter = new ActionBlock<Report>(report =>
{
    RaiseEvent(report) // Or any other mechanism...
});

transform.LinkTo(reporter, new DataflowLinkOptions { PropagateCompletion = true });

MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
     transform.Post(rec);
}

transform.Complete();
await transform.Completion
like image 188
i3arnon Avatar answered Sep 30 '22 11:09

i3arnon