Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Anyway to Parallel Yield c#

I have multiple enumerators that enumerate over flat files. I originally had each enumerator in a Parallel Invoke and each Action was adding to a BlockingCollection<Entity> and that collections was returning a ConsumingEnumerable();

public interface IFlatFileQuery
{
    IEnumerable<Entity> Run();
}

public class FlatFile1 : IFlatFileQuery
{
    public IEnumerable<Entity> Run()
    {
        // loop over a flat file and yield each result
        yield return Entity;
    }
} 

public class Main
{
    public IEnumerable<Entity> DoLongTask(ICollection<IFlatFileQuery> _flatFileQueries)
    {
            // do some other stuff that needs to be returned first:
            yield return Entity;

            // then enumerate and return the flat file data
        foreach (var entity in GetData(_flatFileQueries))
        {
            yield return entity;
        }
    }

    private IEnumerable<Entity> GetData(_flatFileQueries)
    {
        var buffer = new BlockingCollection<Entity>(100);

        var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() =>
        {
            foreach (var entity in fundFileQuery.Run())
            {
                buffer.TryAdd(entity, Timeout.Infinite);
            }
        })).ToArray();

        Task.Factory.StartNew(() =>
        {
            Parallel.Invoke(actions);

            buffer.CompleteAdding();
        });

        return buffer.GetConsumingEnumerable();
    }
}

However after a bit of testing it turns out that the code change below is about 20-25% faster.

private IEnumerable<Entity> GetData(_flatFileQueries)
{
    return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run());
}

The trouble with the code change is that it waits till all flat file queries are enumerated before it returns the whole lot that can then be enumerated and yielded.

Would it be possible to yield in the above bit of code somehow to make it even faster?

I should add that at most the combined results of all the flat file queries might only be 1000 or so Entities.

Edit: Changing it to the below doesn't make a difference to the run time. (R# even suggests to go back to the way it was)

private IEnumerable<Entity> GetData(_flatFileQueries)
{
        foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()))
        {
            yield return entity;
        }
}
like image 773
Mark Vickery Avatar asked Nov 14 '12 15:11

Mark Vickery


1 Answers

The trouble with the code change is that it waits till all flat file queries are enumerated before it returns the whole lot that can then be enumerated and yielded.

Let's prove that it's false by a simple example. First, let's create a TestQuery class that will yield a single entity after a given time. Second, let's execute several test queries in parallel and measure how long it took to yield their result.

public class TestQuery : IFlatFileQuery {

    private readonly int _sleepTime;

    public IEnumerable<Entity> Run() {
        Thread.Sleep(_sleepTime);
        return new[] { new Entity() };
    }

    public TestQuery(int sleepTime) {
        _sleepTime = sleepTime;
    }

}

internal static class Program {

    private static void Main() {
        Stopwatch stopwatch = Stopwatch.StartNew();
        var queries = new IFlatFileQuery[] {
            new TestQuery(2000),
            new TestQuery(3000),
            new TestQuery(1000)
        };
        foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run()))
            Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds);
        Console.ReadKey();
    }

}

This code prints:

Yielded after 1 seconds
Yielded after 2 seconds
Yielded after 3 seconds

You can see with this output that AsParallel() will yield each result as soon as its available, so everything works fine. Note that you might get different timings depending on the degree of parallelism (such as "2s, 5s, 6s" with a degree of parallelism of 1, effectively making the whole operation not parallel at all). This output comes from an 4-cores machine.

Your long processing will probably scale with the number of cores, if there is no common bottleneck between the threads (such as a shared locked resource). You might want to profile your algorithm to see if there are slow parts that can be improved using tools such as dotTrace.

like image 150
Julien Lebosquain Avatar answered Nov 02 '22 04:11

Julien Lebosquain