Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread count growth when using Task Parallel Library

I'm using C# TPL and I'm having a problem with a producer/consumer code... for some reason, TPL doesn't reuse threads and keeps creating new ones without stopping

I made a simple example to demonstrate this behavior:

class Program
{
    static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
    static CancellationTokenSource m_Cts = new CancellationTokenSource();

    static void Producer()
    {
        try
        {
            while (!m_Cts.IsCancellationRequested)
            {
                Console.WriteLine("Enqueuing job");
                m_Buffer.Add(0);
                Thread.Sleep(1000);
            }
        }
        finally
        {
            m_Buffer.CompleteAdding();
        }
    }

    static void Consumer()
    {
        Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
    }

    static void Run(int i)
    {
        Console.WriteLine
            ("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
              Thread.CurrentThread.ManagedThreadId, 
              Process.GetCurrentProcess().Threads.Count);
    }

    static void Main(string[] args)
    {
        Task producer = new Task(Producer);
        Task consumer = new Task(Consumer);
        producer.Start();
        consumer.Start();

        Console.ReadKey();
        m_Cts.Cancel();

        Task.WaitAll(producer, consumer);
    }
}

This code creates 2 tasks, producer and consumer. Produces adds 1 work item every second, and Consumer only prints out a string with information. I would assume that 1 consumer thread is enough in this situation, because tasks are processed much faster than they are being added to the queue, but what actually happens is that every second number of threads in the process grows by 1... as if TPL is creating new thread for every item

after trying to understand what's happening I also noticed another thing: even though BlockingCollection size is 1, after a while Consumer starts getting called in bursts, for example, this is how it starts:

Enqueuing job

Job Processed Thread: 4 Process Thread Count: 9

Enqueuing job

Job Processed Thread: 6 Process Thread Count: 9

Enqueuing job

Job Processed Thread: 5 Process Thread Count: 10

Enqueuing job

Job Processed Thread: 4 Process Thread Count: 10

Enqueuing job

Job Processed Thread: 6 Process Thread Count: 11

and this is how it's processing items less than a minute later:

Enqueuing job

Job Processed Thread: 25 Process Thread Count: 52

Enqueuing job

Enqueuing job

Job Processed Thread: 5 Process Thread Count: 54

Job Processed Thread: 5 Process Thread Count: 54

and because threads get disposed after finishing Parallel.ForEach loop (I don't show it in this example, but it was in the real project) I assumed that it has something to do with ForEach specifically... I found this artice http://reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/, and I thought that my problem was caused by this default partitioner, so I took custom partitioner from TPL Examples that is feeding Consumer threads item one by one, and although it fixed the order of execution (got rid of delay)...

Enqueuing job

Job Processed Thread: 71 Process Thread Count: 140

Enqueuing job

Job Processed Thread: 12 Process Thread Count: 141

Enqueuing job

Job Processed Thread: 72 Process Thread Count: 142

Enqueuing job

Job Processed Thread: 38 Process Thread Count: 143

Enqueuing job

Job Processed Thread: 73 Process Thread Count: 143

Enqueuing job

Job Processed Thread: 21 Process Thread Count: 144

Enqueuing job

Job Processed Thread: 74 Process Thread Count: 145

...it didn't stop threads from growing

I know about ParallelOptions.MaxDegreeOfParallelism, but I still want to understand what's happening with TPL and why it creates hundreds of threads for no reason

in my project I a code that has to run for hours and read new data from database, put it into a BlockingCollections and have has data processed by other code, there's 1 new item about every 5 seconds and it takes from several milliseconds to almost a minute to process it, and after running for about 10 minutes, thread count reached over a 1000 threads

like image 221
Gruzilkin Avatar asked Aug 30 '12 07:08

Gruzilkin


1 Answers

There are two things that together cause this behavior:

  1. ThreadPool tries to use the optimal number of threads for your situation. But if one of the threads in the pool blocks, the pool sees this as if that thread wasn't doing any useful work and so it tends to create another thread soon after that. What this means is that if you have a lot of blocking, ThreadPool is really bad at guessing the optimal number of threads and it tends to create new threads until it reaches the limit.

  2. Parallel.ForEach() trusts the ThreadPool to guess the correct number of threads, unless you set the maximum number of threads explicitly. Parallel.ForEach() was also primarily meant for bounded collections, not streams of data.

When you combine these two things with GetConsumingEnumerable(), what you get is that Parallel.ForEach() creates threads that are almost always blocked. The ThreadPool sees this, and, to try to keep the CPU utilized, creates more and more threads.

The correct solution here is to set MaxDegreeOfParallelism. If your computations are CPU-bound, the best value is most likely Environment.ProcessorCount. If they are IO-bound, you will have to find out the best value experimentally.

Another option, if you can use .Net 4.5, is to use TPL Dataflow. This library was made specifically to process streams of data, like you have, so it doesn't have the problems your code has. It's actually even better than that and doesn't use any threads at all when it's not processing anything currently.

Note: There is also a good reason why is a new thread created for each new item, but explaining that would require me to explain how Parallel.ForEach() works in more detail and I feel that's not necessary here.

like image 187
svick Avatar answered Sep 24 '22 17:09

svick