Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to yield from parallel tasks in .NET 4.5

I would like to use .NET iterator with parallel Tasks/await?. Something like this:

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

Unfortunately .NET cannot natively handle this. Best answer so far by @svick - use AsParallel().

BONUS: Any simple async/await code that implements multiple publishers and a single subscriber? The subscriber would yield, and the pubs would process. (core libraries only)

like image 512
Yuri Astrakhan Avatar asked Feb 11 '13 05:02

Yuri Astrakhan


1 Answers

This seems like a job for PLINQ:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

This will execute the delegate in parallel using a limited number of threads, returning each result as soon as it completes.

If the ExecuteOrDownloadSomething() method is IO-bound (e.g. it actually downloads something) and you don't want to waste threads, then using async-await might make sense, but it would be more complicated.

If you want to fully take advantage of async, you shouldn't return IEnumerable, because it's synchronous (i.e. it blocks if no items are available). What you need is some sort of asynchronous collection, and you can use ISourceBlock (specifically, TransformBlock) from TPL Dataflow for that:

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

If the source is “slow” (i.e. you want to start processing the results from Foo() before iterating source is completed), you might want to move the foreach and Complete() call to a separate Task. Even better solution would be to make source into a ISourceBlock<TSrc> too.

like image 91
svick Avatar answered Oct 02 '22 14:10

svick