Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx extensions: Where is Parallel.ForEach?

I have a piece of code which is using Parallel.ForEach, probably based on a old version of Rx extensions or the Tasks Parallel Library. I installed a current version of Rx extensions but cannot find Parallel.ForEach. I'm not using any other fancy stuff of the library and just want to process some data in parallel like this:

Parallel.ForEach(records, ProcessRecord);

I found this question, but I would not like to depend on an old versions of Rx. But I was not able to find something similar for Rx, so what's the current and most straight forward way to do that using a current Rx version? The project is using .NET 3.5.

like image 906
Achim Avatar asked Dec 19 '11 08:12

Achim


2 Answers

No need to do all this silly goosery if you have Rx:

records.ToObservable()
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .ToList()
    .First();

(Or, if you want the order of the items maintained at the cost of efficiency):

records.ToObservable()
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .Concat()
    .ToList()
    .First();

Or if you want to limit how many items at the same time:

records.ToObservable()
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
    .Merge(5 /* at a time */)
    .ToList()
    .First();
like image 104
Ana Betts Avatar answered Sep 29 '22 05:09

Ana Betts


Here's a simple replacement:

class Parallel
{
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
    {
        if (source == null)
        {
            throw new ArgumentNullException("source");
        }
        if (body == null)
        {
            throw new ArgumentNullException("body");
        }
        var items = new List<T>(source);
        var countdown = new CountdownEvent(items.Count);
        WaitCallback callback = state =>
        {
            try
            {
                body((T)state);
            }
            finally
            {
                countdown.Signal();
            }
        };
        foreach (var item in items)
        {
            ThreadPool.QueueUserWorkItem(callback, item);
        }
        countdown.Wait();
    }
}
like image 26
dtb Avatar answered Sep 29 '22 07:09

dtb