Is there any way to limit performance degradation with TPL Dataflow throttling?
I have a complicated pipeline of components and trying to limit a memory requirements needed. I read in parallel from multiple files, components in a pipeline might do some addition reading from random part of those files, the rest of components do CPU bound operations.
I simplified a performance test-bench into these tests using a common test method.
private void TPLPerformaceTest(int generateNumbers,
ExecutionDataflowBlockOptions transformBlockOptions)
{
var transformBlock = new TransformBlock<int, int>(i => i, transformBlockOptions);
var storedCount = 0;
var generatedCount = 0;
var store = new ActionBlock<int>(i => Interlocked.Increment(ref storedCount));
transformBlock.LinkTo(store);
transformBlock.Completion.ContinueWith(_ => store.Complete());
for (int i = 0; i < generateNumbers; i++)
{
transformBlock.SendAsync(i).Wait(); //To ensure delivery
Interlocked.Increment(ref generatedCount);
}
transformBlock.Complete();
store.Completion.Wait();
Assert.IsTrue(generatedCount == generateNumbers);
Assert.IsTrue(storedCount == generateNumbers);
}
The first that has no throttling. On my CPU it takes about 12s to complete, consumes about 800MB of RAM and average CPU utilization is about 35%.
[Test]
public void TPLPerformaceUnlimitedTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions());
}
Second test that just sets BoundedCapacity to int.MaxValue, thus does no limitation at all, takes 20-30s to complete, consumes 2.1GB of RAM and average CPU utilization is about 50%. According to manual, BoundedCapacity should be set to int.MaxValue by default, so I do not see the reason for performance degradation.
[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = Int32.MaxValue });
}
The third test limit BoundedCapacity to generateNumbers / 1000, ergo 100,000. It takes 60s to complete and consumes 450MB of RAM and average CPU utilization is about 60%.
[Test]
[Sequential]
public void TPLPerformaceBounedCapacityTenthTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers,new ExecutionDataflowBlockOptions()
{ BoundedCapacity = generateNumbers / 1000 });
}
The fourth test limits MaxDegreeOfParallelism to -1 which is according to the manual no limit. It consumed 27GB of RAM and average CPU utilization was about 85% and has not finished in 5 minutes.
[Test]
[Sequential]
public void TPLPerformaceMaxDegreeOfParallelismTest()
{
var generateNumbers = 100_000_000;
this.TPLPerformaceTest(generateNumbers, new ExecutionDataflowBlockOptions()
{ MaxDegreeOfParallelism = -1 });
}
All methods seems to affect performance really hard and do not behave due to my reasonable expectations.
You're degrading the performance because of this line:
transformBlock.SendAsync(i).Wait(); //To ensure delivery
This blocks current thread before the delivery is done. You should switch to await
to free the thread for other tasks to proceed:
await transformBlock.SendAsync(i); //To ensure delivery
Update:
I'm confused about your words that
According to manual,
BoundedCapacity
should be set toint.MaxValue
by default
because this is not true, from official documentation:
BoundedCapacity
The majority of the dataflow blocks included inSystem.Threading.Tasks.Dataflow.dll
support the specification of a bounded capacity.
This is the limit on the number of items the block may be storing and have in flight at any one time.
By default, this value is initialized toDataflowBlockOptions.Unbounded
(-1
), meaning that there is no limit.
Here you can see all the default values after running this code:
var options = new ExecutionDataflowBlockOptions();
So second test with BoundedCapacity
set to int.MaxValue
does add a limitation, which add some checks for place availability in block's buffers.
You can see similar behavior in third test, which consumes much less memory than second, but do more checks and waiting times to buffers to free the space, so it works slower, but with tiny memory to allocate.
Also, you can see on the screenshot that the MaxDegreeOfParallelism
is equal to 1
:
MaxDegreeOfParallelism
By default, an individual dataflow block processes only one message at a time, queueing all not-yet-processed messages so that they may be processed when the currently processing message is completed.
After setting this parameter to -1
, you open the Pandora's Box, because all the messages are being executed at the same time by the same task scheduler, according to the documentation again:
If set to
DataflowBlockOptions.Unbounded
(-1
), any number of messages may be processed concurrently, with the maximum automatically managed by the underlying scheduler targeted by the dataflow block.
As I can see in memory consumption, the task scheduler decides to start new threads for each message as there is no available in the thread pool, which takes about 1MB
per message, so you have a about a 27000
threads which are fighting each other to the CPU time. And as you can see, they aren't good at that.
The recommended degree of parallelism usually is a Environment.ProcessorCount
, so if you want to speed up your one block, you can set the MaxDegreeOfParallelism
to this property. However, in more complicated scenarios this can be not always the best option, as other blocks will stop in waiting for a CPU time.
So what is your reasonable expectations
for this?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With