I would appreciate any help from the PLYNQ experts out there! I will take time reviewing answers, I have a more established profile on math.SE.
I have an object of type ParallelQuery<List<string>>
, which has 44 lists which I would like to process in parallel (five at a time, say).
My process has a signature like
private ProcessResult Process(List<string> input)
The processing will return a result, which is a pair of Boolean values, as below.
private struct ProcessResult
{
public ProcessResult(bool initialised, bool successful)
{
ProcessInitialised = initialised;
ProcessSuccessful = successful;
}
public bool ProcessInitialised { get; }
public bool ProcessSuccessful { get; }
}
The problem. Given an IEnumerable<List<string>> processMe
, my PLYNQ query tries to implement this method: https://msdn.microsoft.com/en-us/library/dd384151(v=vs.110).aspx. It is written as
processMe.AsParallel()
.Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
(
new ConcurrentStack<ProcessResult>, //aggregator seed
(agg, input) =>
{ //updating the aggregate result
var res = Process(input);
agg.Push(res);
return agg;
},
agg =>
{ //obtain the result from the aggregator agg
ProcessResult res; // (in this case just the most recent result**)
agg.TryPop(out res);
return res;
}
);
Unfortunately it does not run in parallel, only sequentially. (** note that this implementation doesn't make "sense", I am just trying to get the parallelisation to work for now.)
I tried a slightly different implementation, which did run in parallel, but there was no aggregation. I defined an aggregation method (which is essentially a Boolean AND on both parts of ProcessResult
, i.e. aggregate([A1, A2], [B1, B2]) ≡ [A1 && B1, A2 && B2]).
private static ProcessResult AggregateProcessResults
(ProcessResult aggregate, ProcessResult latest)
{
bool ini = false, suc = false;
if (aggregate.ProcessInitialised && latest.ProcessInitialised)
ini = true;
if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
suc = true;
return new ProcessResult(ini, suc);
}
And used the PLYNQ query https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx
.Aggregate<List<string>, ProcessResult, ProcessResult>(
new ProcessResult(true, true),
(res, input) => Process(input),
(agg, latest) => AggregateProcessResults(agg, latest),
agg => agg
The problem here was that the AggregateProcessResults
code was never hit, for some reason—I am clueless where the results were going...
Thanks for reading, any help appreciated :)
Overload of Aggregate
you use will indeed not run in parallel, by design. You pass seed, then step function, but argument to the step function (agg
) is accumulator which was received from previous step. For that reason, it's inherently sequential (result of previous step is input to the next step) and not parallelizable. Not sure why this overload is included to ParallelEnumerable
, but probably there was a reason.
Instead, use another overload:
var result = processMe
.AsParallel()
.Aggregate
(
// seed factory. Each partition will call this to get its own seed
() => new ConcurrentStack<ProcessResult>(),
// process element and update accumulator
(agg, input) =>
{
var res = Process(input);
agg.Push(res);
return agg;
},
// combine accumulators from different partitions
(agg1, agg2) => {
agg1.PushRange(agg2.ToArray());
return agg1;
},
// reduce
agg =>
{
ProcessResult res;
agg.TryPop(out res);
return res;
}
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With