Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Possible reasons why ParallelQuery.Aggregate does not run in parallel

Tags:

I would appreciate any help from the PLYNQ experts out there! I will take time reviewing answers, I have a more established profile on math.SE.

I have an object of type ParallelQuery<List<string>>, which has 44 lists which I would like to process in parallel (five at a time, say). My process has a signature like

private ProcessResult Process(List<string> input)

The processing will return a result, which is a pair of Boolean values, as below.

    private struct ProcessResult
    {
        public ProcessResult(bool initialised, bool successful)
        {
            ProcessInitialised = initialised;
            ProcessSuccessful = successful;
        }

        public bool ProcessInitialised { get; }
        public bool ProcessSuccessful { get; }
    }

The problem. Given an IEnumerable<List<string>> processMe, my PLYNQ query tries to implement this method: https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx. It is written as

processMe.AsParallel()
         .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
             (
                 new ConcurrentStack<ProcessResult>,   //aggregator seed
                 (agg, input) =>
                 {                         //updating the aggregate result
                     var res = Process(input);
                     agg.Push(res);
                     return agg;
                 },
                 agg => 
                 {                         //obtain the result from the aggregator agg
                     ProcessResult res;    // (in this case just the most recent result**)
                     agg.TryPop(out res);
                     return res;
                 }
             );

Unfortunately it does not run in parallel, only sequentially. (** note that this implementation doesn't make "sense", I am just trying to get the parallelisation to work for now.)


I tried a slightly different implementation, which did run in parallel, but there was no aggregation. I defined an aggregation method (which is essentially a Boolean AND on both parts of ProcessResult, i.e. aggregate([A1, A2], [B1, B2]) ≡ [A1 && B1, A2 && B2]).

private static ProcessResult AggregateProcessResults
        (ProcessResult aggregate, ProcessResult latest)
    {
        bool ini = false, suc = false;
        if (aggregate.ProcessInitialised && latest.ProcessInitialised)
            ini = true;
        if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
            suc = true;


        return new ProcessResult(ini, suc);
    }

And used the PLYNQ query https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true),
    (res, input)  => Process(input),
    (agg, latest) => AggregateProcessResults(agg, latest),
    agg           => agg

The problem here was that the AggregateProcessResults code was never hit, for some reason—I am clueless where the results were going...

Thanks for reading, any help appreciated :)

like image 207
Szmagpie Avatar asked Nov 21 '17 18:11

Szmagpie


1 Answers

Overload of Aggregate you use will indeed not run in parallel, by design. You pass seed, then step function, but argument to the step function (agg) is accumulator which was received from previous step. For that reason, it's inherently sequential (result of previous step is input to the next step) and not parallelizable. Not sure why this overload is included to ParallelEnumerable, but probably there was a reason.

Instead, use another overload:

var result = processMe
.AsParallel()
.Aggregate
(
    // seed factory. Each partition will call this to get its own seed
    () => new ConcurrentStack<ProcessResult>(),
    // process element and update accumulator
    (agg, input) =>
    {                                           
        var res = Process(input);
        agg.Push(res);
        return agg;
    },
    // combine accumulators from different partitions
    (agg1, agg2) => {
        agg1.PushRange(agg2.ToArray());
        return agg1;
    },
    // reduce
    agg =>
    {
        ProcessResult res;
        agg.TryPop(out res);
        return res;
    }
);
like image 76
Evk Avatar answered Sep 19 '22 13:09

Evk