I'm experimenting with TPL Dataflow before porting it into my production code. The production code is a classical producer/consumer system - producer(s) produce messages (related to financial domain), consumers process those messages.
What I'm interested in, is how stable environment will stay if at some point producer(s) produce much more faster than consumers can handle it (will system blow up, or what will happen) & more importantly what to do in those cases.
So in attempt to have similar simple application I come up with following.
var bufferBlock = new BufferBlock<Item>();
var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
,
BoundedCapacity = 100000
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
executiondataflowBlockOptions);
bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem());
}
bufferBlock.Complete();
Console.ReadLine();
Item
is a very simple class
internal class Item
{
public Item(string itemId)
{
ItemId = itemId;
}
public string ItemId { get; }
}
GenerateItem
simply news up Item
static Item GenerateItem()
{
return new Item(Guid.NewGuid().ToString());
}
Now, to imitate not so fast consumer - I made ProcessItem
to hold for 100ms
.
static async Task ProcessItem(Item item)
{
await Task.Delay(TimeSpan.FromMilliseconds(100));
Console.WriteLine($"Processing #{item.ItemId} item.");
}
Executing this results in OOM exception in 20 or so seconds.
Then I went on and added more consumers (more ActionBlocks up to 10), which wins some more time, but eventually results in the same OOM exception.
I also noticed that GC is under huge pressure (VS 2015 Diagnostics tool shows GC is running almost all the time), so I introduced object pooling (very simple one, essentially it is ConcurrentBag
storing items) for Item
, but still I'm hitting the same wall (OOM exception is thrown).
To give some details on what is in memory, why it is running out of it.
SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item>
& ConcurrentQueue+Segment<TplDataFlow.Item>
BufferBlock
's InputBuffer is full of Item
s (Count=14,562,296)BoundedCapacity
for ActionBlock
(s), their input buffer is also close to the configurated number (InputCount=99,996)To make sure that slower producer would make it possible for consumers to keep up, I made producer to sleep between iterations :
for (int i = 0; i < int.MaxValue; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(50));
bufferBlock.SendAsync(GenerateItem());
}
And it works fine - no exception is thrown, memory usage is constantly low, I don't see any GC pressure anymore.
So I have few questions
BufferBlock
's internal buffer is getting filled with messages very fast, and holds off to messages until some of the consumers come back to ask for next message as a result application runs out of memory (due to filled up internal buffer of BufferBlock
) - would you agree with this ?I'm using Microsoft.Tpl.Dataflow
package -version 4.5.24.
.NET 4.5 (C# 6). Process is 32 bit.
You've identified the problem nicely: the BufferBlock
is filling its input buffer until it hits OOM.
To solve this, you should add a BoundedCapacity
option to your buffer block as well. This will throttle producers automatically for you (no need for the Thread.Sleep
in your producer).
There is a potentially serious problem with the code below:
for (int i = 0; i < int.MaxValue; i++)
{
bufferBlock.SendAsync(GenerateItem()); // Don't do this!
}
The SendAsync
method returns a Task
, which can be a much heavier object memory-wise than the actual item you are sending to the block. In the specific example the returned task is always completed, because the BufferBlock
has unbounded capacity, and so the memory footprint of the task is practically zero (the same cached Task<bool>
instance is returned all the time). But after configuring the block with a small BoundedCapacity
value, things will quickly get interesting (in an unpleasant way). Each call to SendAsync
will soon start returning an incomplete Task
, different each time, with a memory footprint of around 200 bytes per task (300 bytes if the CancellationToken
parameter is also used). This will obviously not going to scale well.
The solution is to use the SendAsync
the way it is intended to be used. Which means it should be awaited:
for (int i = 0; i < int.MaxValue; i++)
{
await bufferBlock.SendAsync(GenerateItem()); // It's OK now
}
This way the producer will be asynchronously blocked until there is available room inside the block to accommodate the sent item. Which is hopefully what you want. Otherwise, if you don't want to block the producer, don't use the asynchronous SendAsync
method, and use the synchronous Post
method instead:
for (int i = 0; i < int.MaxValue; i++)
{
var item = GenerateItem();
while (true)
{
bool accepted = bufferBlock.Post(item); // Synchronous call
if (accepted) break; // Break the inner loop
if (bufferBlock.Completion.IsCompleted) return; // Break both loops
// Here do other things for a while, before retrying to post the item
}
}
Alternatively you could use the lower lever OfferMessage
method (instead of the Post
or the SendAsync
):
for (int i = 0; i < int.MaxValue; i++)
{
var item = GenerateItem();
while (true)
{
var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
new DataflowMessageHeader(1L), item, null, false);
if (offerResult == DataflowMessageStatus.Accepted) break;
if (offerResult == DataflowMessageStatus.DecliningPermanently) return;
// Here do other things for a while, before retrying to offer the item
}
}
The magic number 1L
is a value declared internally in the TPL Dataflow source code, denoting:
A well-known message ID for code that will send exactly one message, or where the exact message ID is not important.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With