Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel invocation of elements of an IEnumerable

Tags:

I have an IEnumerable<IEnumerable<T>> method called Batch that works like

var list = new List<int>() { 1, 2, 4, 8, 10, -4, 3 }; 
var batches = list.Batch(2); 
foreach(var batch in batches)
    Console.WriteLine(string.Join(",", batch));

-->

1,2
4,8
10,-4
3

The problem I've having is that I'm to optimize something like

foreach(var batch in batches)
    ExecuteBatch(batch);

by

Task[] tasks = batches.Select(batch => Task.Factory.StartNew(() => ExecuteBatch(batch))).ToArray();
Task.WaitAll(tasks);

or

Action[] executions = batches.Select(batch => new Action(() => ExecuteBatch(batch))).ToArray();
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.Invoke(options, executions);

(because ExecuteBatch is a long-running operation involving IO)

then I notice that each batch gets screwed up, is only 1 element which is default(int). Any idea what's happening or how to fix it?

Batch:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    for(var mover = source.GetEnumerator(); ;)
    {
        if(!mover.MoveNext())
            yield break;
        yield return LimitMoves(mover, size);
    }
}
private static IEnumerable<T> LimitMoves<T>(IEnumerator<T> mover, int limit)
{
    do yield return mover.Current;
    while(--limit > 0 && mover.MoveNext());
}
like image 675
Born Under a Bad Sign Avatar asked Jul 19 '17 01:07

Born Under a Bad Sign


1 Answers

As noted in the comments, your actual issue is your implementation of Batch.

This code:

for(var mover = source.GetEnumerator(); ;)
{
    if(!mover.MoveNext())
        yield break;
    yield return LimitMoves(mover, size);
}

When Batch is materialized, this code is going to continually call MoveNext() until the enumerable is exhausted. LimitMoves() uses the same iterator, and is lazily invoked. Since Batch exhausts the enumerable, LimitMoves() will never emit an item. (Actually, it will only emit default(T) since it always returns mover.Current, which will be default(T) once the enumerable is finished).

Here's an implementation of Batch which will work when materialized (and thus when in parallel).

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    var mover = source.GetEnumerator();
    var currentSet = new List<T>();
    while (mover.MoveNext())
    {
        currentSet.Add(mover.Current);
        if (currentSet.Count >= size)
        {   
            yield return currentSet;
            currentSet = new List<T>();
        }
    }
    if (currentSet.Count > 0)
        yield return currentSet;
}

Alternatively, you could use MoreLINQ - which comes with a Batch implementation. You can see their implementation here

like image 127
Rob Avatar answered Oct 14 '22 02:10

Rob