I have a piece of code which is using Parallel.ForEach
, probably based on a old version of Rx extensions or the Tasks Parallel Library. I installed a current version of Rx extensions but cannot find Parallel.ForEach
. I'm not using any other fancy stuff of the library and just want to process some data in parallel like this:
Parallel.ForEach(records, ProcessRecord);
I found this question, but I would not like to depend on an old versions of Rx. But I was not able to find something similar for Rx, so what's the current and most straight forward way to do that using a current Rx version? The project is using .NET 3.5.
No need to do all this silly goosery if you have Rx:
records.ToObservable()
.SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
.ToList()
.First();
(Or, if you want the order of the items maintained at the cost of efficiency):
records.ToObservable()
.Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
.Concat()
.ToList()
.First();
Or if you want to limit how many items at the same time:
records.ToObservable()
.Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
.Merge(5 /* at a time */)
.ToList()
.First();
Here's a simple replacement:
class Parallel
{
public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (body == null)
{
throw new ArgumentNullException("body");
}
var items = new List<T>(source);
var countdown = new CountdownEvent(items.Count);
WaitCallback callback = state =>
{
try
{
body((T)state);
}
finally
{
countdown.Signal();
}
};
foreach (var item in items)
{
ThreadPool.QueueUserWorkItem(callback, item);
}
countdown.Wait();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With