Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit the amount of concurrent async I/O operations?

People also ask

Are Async functions Concurrent?

Writing async code may be considered the first step for writing concurrent code. Since async/await is just a pattern, it doesn't make javascript any more concurrent or asynchronous than callbacks did. But it does make it exponentially easier to read it.

Do async tasks run in parallel?

There is no parallelism here, as the “async Task” does not automatically make something run in in parallel. This will spawn 2 threads, run them simultaneously, and return when both threads are done. This will create a list of Tasks to be run at the same time.

Does async await improve performance?

C# Language Async-Await Async/await will only improve performance if it allows the machine to do additional work.


You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .


If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage: await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.


SemaphoreSlim can be very helpful here. Here's the extension method I've created.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);