I'm using C# TPL and I'm having a problem with a producer/consumer code... for some reason, TPL doesn't reuse threads and keeps creating new ones without stopping
I made a simple example to demonstrate this behavior:
class Program
{
static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1);
static CancellationTokenSource m_Cts = new CancellationTokenSource();
static void Producer()
{
try
{
while (!m_Cts.IsCancellationRequested)
{
Console.WriteLine("Enqueuing job");
m_Buffer.Add(0);
Thread.Sleep(1000);
}
}
finally
{
m_Buffer.CompleteAdding();
}
}
static void Consumer()
{
Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run);
}
static void Run(int i)
{
Console.WriteLine
("Job Processed\tThread: {0}\tProcess Thread Count: {1}",
Thread.CurrentThread.ManagedThreadId,
Process.GetCurrentProcess().Threads.Count);
}
static void Main(string[] args)
{
Task producer = new Task(Producer);
Task consumer = new Task(Consumer);
producer.Start();
consumer.Start();
Console.ReadKey();
m_Cts.Cancel();
Task.WaitAll(producer, consumer);
}
}
This code creates 2 tasks, producer and consumer. Produces adds 1 work item every second, and Consumer only prints out a string with information. I would assume that 1 consumer thread is enough in this situation, because tasks are processed much faster than they are being added to the queue, but what actually happens is that every second number of threads in the process grows by 1... as if TPL is creating new thread for every item
after trying to understand what's happening I also noticed another thing: even though BlockingCollection size is 1, after a while Consumer starts getting called in bursts, for example, this is how it starts:
Enqueuing job
Job Processed Thread: 4 Process Thread Count: 9
Enqueuing job
Job Processed Thread: 6 Process Thread Count: 9
Enqueuing job
Job Processed Thread: 5 Process Thread Count: 10
Enqueuing job
Job Processed Thread: 4 Process Thread Count: 10
Enqueuing job
Job Processed Thread: 6 Process Thread Count: 11
and this is how it's processing items less than a minute later:
Enqueuing job
Job Processed Thread: 25 Process Thread Count: 52
Enqueuing job
Enqueuing job
Job Processed Thread: 5 Process Thread Count: 54
Job Processed Thread: 5 Process Thread Count: 54
and because threads get disposed after finishing Parallel.ForEach loop (I don't show it in this example, but it was in the real project) I assumed that it has something to do with ForEach specifically... I found this artice http://reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/, and I thought that my problem was caused by this default partitioner, so I took custom partitioner from TPL Examples that is feeding Consumer threads item one by one, and although it fixed the order of execution (got rid of delay)...
Enqueuing job
Job Processed Thread: 71 Process Thread Count: 140
Enqueuing job
Job Processed Thread: 12 Process Thread Count: 141
Enqueuing job
Job Processed Thread: 72 Process Thread Count: 142
Enqueuing job
Job Processed Thread: 38 Process Thread Count: 143
Enqueuing job
Job Processed Thread: 73 Process Thread Count: 143
Enqueuing job
Job Processed Thread: 21 Process Thread Count: 144
Enqueuing job
Job Processed Thread: 74 Process Thread Count: 145
...it didn't stop threads from growing
I know about ParallelOptions.MaxDegreeOfParallelism, but I still want to understand what's happening with TPL and why it creates hundreds of threads for no reason
in my project I a code that has to run for hours and read new data from database, put it into a BlockingCollections and have has data processed by other code, there's 1 new item about every 5 seconds and it takes from several milliseconds to almost a minute to process it, and after running for about 10 minutes, thread count reached over a 1000 threads
There are two things that together cause this behavior:
ThreadPool
tries to use the optimal number of threads for your situation. But if one of the threads in the pool blocks, the pool sees this as if that thread wasn't doing any useful work and so it tends to create another thread soon after that. What this means is that if you have a lot of blocking, ThreadPool
is really bad at guessing the optimal number of threads and it tends to create new threads until it reaches the limit.
Parallel.ForEach()
trusts the ThreadPool
to guess the correct number of threads, unless you set the maximum number of threads explicitly. Parallel.ForEach()
was also primarily meant for bounded collections, not streams of data.
When you combine these two things with GetConsumingEnumerable()
, what you get is that Parallel.ForEach()
creates threads that are almost always blocked. The ThreadPool
sees this, and, to try to keep the CPU utilized, creates more and more threads.
The correct solution here is to set MaxDegreeOfParallelism
. If your computations are CPU-bound, the best value is most likely Environment.ProcessorCount
. If they are IO-bound, you will have to find out the best value experimentally.
Another option, if you can use .Net 4.5, is to use TPL Dataflow. This library was made specifically to process streams of data, like you have, so it doesn't have the problems your code has. It's actually even better than that and doesn't use any threads at all when it's not processing anything currently.
Note: There is also a good reason why is a new thread created for each new item, but explaining that would require me to explain how Parallel.ForEach()
works in more detail and I feel that's not necessary here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With