Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly run multiple async tasks in parallel?

What if you need to run multiple asynchronous I/O tasks in parallel but need to make sure that no more than X I/O processes are running at the same time; and pre and post I/O processing tasks shouldn't have such limitation.

Here is a scenario - let's say there are 1000 tasks; each of them accepts a text string as an input parameter; transforms that text (pre I/O processing) then writes that transformed text into a file. The goal is to make pre-processing logic utilize 100% of CPU/Cores and I/O portion of the tasks run with max 10 degree of parallelism (max 10 simultaneously opened for writing files at a time).

Can you provide a sample code how to do it with C# / .NET 4.5?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx

like image 696
Grief Coder Avatar asked May 29 '12 14:05

Grief Coder


People also ask

What can you use to run multiple asynchronous operations in parallel?

The method async. parallel() is used to run multiple asynchronous operations in parallel.

Do async tasks run in parallel?

There is no parallelism here, as the “async Task” does not automatically make something run in in parallel. This will spawn 2 threads, run them simultaneously, and return when both threads are done. This will create a list of Tasks to be run at the same time.

How do you run async function in parallel?

In order to run multiple async/await calls in parallel, all we need to do is add the calls to an array, and then pass that array as an argument to Promise. all() . Promise. all() will wait for all the provided async calls to be resolved before it carries on(see Conclusion for caveat).

Do async functions run on the same thread?

Async methods don't require multithreading because an async method doesn't run on its own thread. The method runs on the current synchronization context and uses time on the thread only when the method is active.


3 Answers

I think using TPL Dataflow for this would be a good idea: you create pre- and post-process blocks with unbounded parallelism, a file-writing block with limited parallelism and link them together. Something like:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    };

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);

var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
            {
                await WriteToFile(s);
                return s;
            },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);

var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);

// use something like await preProcessBlock.SendAsync("text") here

preProcessBlock.Complete();
await postProcessBlock.Completion;

Where WriteToFile() could look like this:

private static async Task WriteToFile(string s)
{
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);
}
like image 92
svick Avatar answered Nov 04 '22 05:11

svick


It sounds like you'd want to consider a Djikstra Semaphore to control access to the starting of tasks.

However, this sounds like a typical queue/fixed number of consumers kind of problem, which may be a more appropriate way to structure it.

like image 1
Jeff Watkins Avatar answered Nov 04 '22 05:11

Jeff Watkins


I would create an extension method in which one can set maximum degree of parallelism. SemaphoreSlim will be the savior here.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
like image 1
Jay Shah Avatar answered Nov 04 '22 06:11

Jay Shah