Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# threads when executing many tasks in Task.WhenAll

Tags:

What will happen if, on a single thread, I do this:

await Task.WhenAll(items.select(x => SomeAsyncMethod(x)))

// Where SomeAsyncMethod is defined like this (writeAsync is pure async io)
async Task SomeAsyncMethod(Item item){
  await myDevice.writeAsync(...).ConfigureAwait(false);
  //do some cpu intensive stuff...
}

and say there are 10.000 items in items. When each of the SomeAsyncMethod continue after the await, then it does so on a thread from the thread pool. So when many of the SomeAsyncsMethods return will several threads from the thread pool then be taken simultaneously, or will only a single thread execute "do some CPU intensive stuff" in SomeAsyncMethod at any given moment in this case?

UPDATE: Ok here is a sample program. When I test this on a PC with 8 logical cores, then minthreads is 12 or 13 and maxthreads ends in the 35-40 range. So it looks as if up to 4 threads will be created pr logical core. It does not matter if 10.000 or 100.000 files are created - the same max number of threads is used - maybe this is because all the tasks queue up waiting for access to the file system ?. Please note, that this program will create lots of small files in c:\tmp\asynctest:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            for (var i = 0; i < 10000; i++) {
                ints.Add(i);
            }
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync(i.ToString())));
            task.Wait();
            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }
    }

    public class MyDevice {
        public ConcurrentDictionary<string, string> ThreadIds;
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}

Update 2. Now if I start multiple threads that each do lots of aysnc io tasks simultaneously, then it does not look as if more threads in total are used compared to the single threaded example in update 1. In the test I just ran, where 10.000 files are created each by 4 threads, then max threads was 41 and min threads 12 - so there seems to be some central control of how many threads are used for async task continuations. Here is a the example where 4 threads starts 10.000 async operations each:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication4 {
    internal class Program {
        private static void Main(string[] args) {
            var myDevice = new MyDevice();
            var ints = new List<int>();
            const int limit = 10000;
            for (var i = 0; i < limit; i++) {
                ints.Add(i);
            }

            List<Task> jobs = new List<Task>();
            for (var j = 0; j < 4*limit; j+=limit) {
                var jobid = j;
                jobs.Add(Task.Run(() => Runjob(ints, myDevice, jobid)));
            }
            Task.WaitAll(jobs.ToArray());

            Console.WriteLine("Max thread count = " + myDevice.MaxThreadCount);
            Console.WriteLine("Min thread count = " + myDevice.MinThreadCount);
            Console.ReadLine();
        }

        private static void Runjob(List<int> ints, MyDevice myDevice, int jobid) {
            Console.WriteLine("Starting job " + jobid);
            var task = Task.WhenAll(ints.Select(i => myDevice.WriteTextAsync((jobid+i).ToString())));
            task.Wait();
            Console.WriteLine("Finished job " + jobid);
        }
    }

    public class MyDevice {
        public int MaxThreadCount;
        public int MinThreadCount = Process.GetCurrentProcess().Threads.Count;
        public async Task WriteTextAsync(string text) {
            var filePath = @"c:\tmp\asynctest\" + text + ".txt";
            var encodedText = Encoding.Unicode.GetBytes(text);
            using (var sourceStream = new FileStream(filePath,
                FileMode.Append, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true)) {
                await sourceStream.WriteAsync(encodedText, 0, encodedText.Length).ConfigureAwait(false);
                MaxThreadCount = Math.Max(MaxThreadCount, Process.GetCurrentProcess().Threads.Count);
                MinThreadCount = Math.Min(MinThreadCount, Process.GetCurrentProcess().Threads.Count);
            }
        }
    }
}
like image 601
Stig Schmidt Nielsson Avatar asked Jan 31 '17 08:01

Stig Schmidt Nielsson


1 Answers

The most likely scenario is that the "CPU instensive stuff" will each happen on a random thread-pool thread - and if it's truly CPU-bound, you'll get about 1-2 threads per a logical core doing the work.

The key point is that while the continuation to the original task (Task.WhenAll) will run back on the UI thread (if there is a synchronization context, of course), the continuations to the individual I/O operations will be posted on the thread-pool, since you explicitly requested the synchronization context to be ignored (ConfigureAwait(false)).

However, there is also a chance that everything will run on the original thread - if the I/O request completes synchronously. In that case, no asynchronous dispatch is done, and there is no opportunity for the tasks to switch threads. If you need to ensure parallelisation, you must use Task.Run explicitly.

It should also be noted that this is mostly implementation dependent, and not something you can rely on. It might also be a bad approach for a heavily asynchronous-I/O application, since you might be running the CPU intensive stuff on a thread from the I/O thread-pool - disturbing the framework balancing of threads in the thread-pool, and preventing new asynchronous responses from coming through until you're finished with your work. This is especially true if the work you're doing isn't pure CPU work - blocking on a thread-pool thread can be quite painful on something like a web server, for example.

like image 133
Luaan Avatar answered Sep 25 '22 11:09

Luaan