I would like to report progress from a long running PLINQ query.
I can't really find any native LINQ method that allows me to do this (as was implemented for cancellation).
I have read this article that shows a neat extension function for a regular serialized query.
I have been testing the behavior using the below code.
var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
//.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
.AsParallel()
.AsOrdered()
//.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
.Select(v => { Thread.Sleep(1); return v * v; })
//.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
.ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
Edit:
Reporting progress from the middle using the IEnumerable<T> extension removes the parallelism.
Reporting from the end does not report any progress while the parallel calculations are being computed then quickly reports all the progress at the very end. I assume this is progress of compiling of the results from the parallel computation into a list.
I originally thought the progress reporting from beginning was causing the the LINQ to run unparallelized. After sleeping on this, and reading comments from Peter Duniho, I see that it is actually working as parallel, but I am getting so many progress reports that handling so many is causing my test/application to slow significantly.
Is there a way that is parallel/thread safe to report progress from a PLINQ in increments that allow a user to know progress is being made without having a significant impact on the method runtime?
This answer might not be as elegant, but it gets the job done.
When using PLINQ, there are multiple threads processing your collection, so using those threads to report progress results in multiple (and out-of-order) progress reports like 0% 1% 5% 4% 3% etc...
Instead, we can use those multiple threads to update a shared variable storing the progress. In my example, it's a local variable completed. We then spawn another thread using Task.Run() to report on that progress variable at 0.5s intervals.
Extension class:
static class Extensions
public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
{
return sequence.Select(x =>
{
increment?.Invoke();
return x;
});
}
}
Code:
static void Main(string[] args)
{
long completed = 0;
Task.Run(() =>
{
while (completed < 100000)
{
Console.WriteLine((completed * 100 / 100000) + "%");
Thread.Sleep(500);
}
});
DateTime start = DateTime.Now;
var output = Enumerable.Range(1, 100000)
.AsParallel()
.WithProgressReporting(()=>Interlocked.Increment(ref completed))
.Select(v => { Thread.Sleep(1); return v * v; })
.ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
Console.ReadKey();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With