Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

`Parallel.ForEach` with final step in defined order

I'm looking for a "neat" and efficient way to achieve a combination of a long step 1 (which may be parallelised) followed by a step 2 which needs to be in original order (and if possible minimising the amount of data from the first step held in RAM) whilst allowing second steps to start as soon as the data from the step 1 for the first object is available, alongside step2 for further data.

To put flesh onto this, and make it clearer, I'm needing to compress a large number of images (slow - Step 1), followed by sending each in order (step 2) over a network connection. Limiting the number of chunks of prepared compressed data in RAM at any stage is also important, so for instance if sending 1000 images, I'd like to limit the number of "finished" but unsent images to (say) the number of threads/processors used.

I've done a "hand-written" version of this, using an array of Task objects but it seems quite messy, and I'm sure other people must have similar needs, so is there a more "standard" way of doing this? Ideally, I'd like a variation on Parallel.ForEach with 2 delegates - one for Step 1 and one for Step 2, and I hoped that one of the standard overrides such as those including a "localFinal" parameter might have helped, but in turns out those final stages are "per Thread", not "per delegate".

Can anyone point me at an existing neat way to achieve this?

like image 349
medconn Avatar asked Sep 26 '22 21:09

medconn


1 Answers

You could use a combination of Plinq (with WithDegreeOfParallelism() to limit concurrency in stage one), along with BlockingCollection for the completed block. Also, note that it uses AsOrdered() to preserve the original order.

The following example demonstrates. For your actual application, you'd replace the int work items shown here with your work item type - either a filename, or a class with information relating to each work item.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    static class Program
    {
        static void Main()
        {
            int maxThreads = 4;
            int maxOutputQueueSize = 10;
            var workItems = Enumerable.Range(1, 100); // Pretend these are your files
            var outputQueue = new BlockingCollection<int>(maxOutputQueueSize);
            var worker = Task.Run(() => output(outputQueue));

            var parallelWorkItems = 
                workItems
                .AsParallel()
                .AsOrdered()
                .WithDegreeOfParallelism(maxThreads)
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select(process);

            foreach (var item in parallelWorkItems)
                outputQueue.Add(item);

            outputQueue.CompleteAdding();
            worker.Wait();

            Console.WriteLine("Done.");
        }

        static int process(int value) // Pretend that this compresses the data.
        {
            Console.WriteLine($"Worker {Thread.CurrentThread.ManagedThreadId} is processing {value}");
            Thread.Sleep(250);  // Simulate slow operation.
            return value; // Return updated work item.
        }

        static void output(BlockingCollection<int> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
                Console.WriteLine($"Output is processing {item}");

            Console.WriteLine("Finished outputting.");
        }
    }
}

Note how you can limit both the input queue processing (via WithDegreeOfParallelism) and the size of the output queue (with the maxOutputQueueSize parameter).

Alternatively, if you're using .Net 4.5 or later, you could look into the TPL Dataflow library which has a lot of support for this kind of thing. I'd recommend using that if you can - but it's a bit too much to describe in an answer here.

like image 54
Matthew Watson Avatar answered Sep 29 '22 04:09

Matthew Watson