Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow - very fast producer, not so fast consumers OutOfMemory exception

I'm experimenting with TPL Dataflow before porting it into my production code. The production code is a classical producer/consumer system - producer(s) produce messages (related to financial domain), consumers process those messages.

What I'm interested in, is how stable environment will stay if at some point producer(s) produce much more faster than consumers can handle it (will system blow up, or what will happen) & more importantly what to do in those cases.

So in attempt to have similar simple application I come up with following.

var bufferBlock = new BufferBlock<Item>();

var executiondataflowBlockOptions = new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount
                        ,
    BoundedCapacity = 100000
};

var dataFlowLinkOptions = new DataflowLinkOptions
{
    PropagateCompletion = true
};

var actionBlock1 = new ActionBlock<Item>(t => ProcessItem(t),
    executiondataflowBlockOptions);

bufferBlock.LinkTo(actionBlock1, dataFlowLinkOptions);
for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem());
}

bufferBlock.Complete();
Console.ReadLine();

Item is a very simple class

internal class Item
{
    public Item(string itemId)
    {
        ItemId = itemId;
    }

    public string ItemId { get; }
}

GenerateItem simply news up Item

static Item GenerateItem()
{
   return new Item(Guid.NewGuid().ToString());
}

Now, to imitate not so fast consumer - I made ProcessItem to hold for 100ms.

static async Task ProcessItem(Item item)
{
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    Console.WriteLine($"Processing #{item.ItemId} item.");
}

Executing this results in OOM exception in 20 or so seconds.

Then I went on and added more consumers (more ActionBlocks up to 10), which wins some more time, but eventually results in the same OOM exception.

I also noticed that GC is under huge pressure (VS 2015 Diagnostics tool shows GC is running almost all the time), so I introduced object pooling (very simple one, essentially it is ConcurrentBag storing items) for Item, but still I'm hitting the same wall (OOM exception is thrown).

To give some details on what is in memory, why it is running out of it.

  • Biggest size have objects of type SingleProducerSingleConsumerQueue+Segment<TplDataFlow.Item> & ConcurrentQueue+Segment<TplDataFlow.Item>
  • I see that BufferBlock's InputBuffer is full of Items (Count=14,562,296)
  • Since I setup BoundedCapacity for ActionBlock(s), their input buffer is also close to the configurated number (InputCount=99,996)

To make sure that slower producer would make it possible for consumers to keep up, I made producer to sleep between iterations :

for (int i = 0; i < int.MaxValue; i++)
{
    Thread.Sleep(TimeSpan.FromMilliseconds(50));
    bufferBlock.SendAsync(GenerateItem());
}

And it works fine - no exception is thrown, memory usage is constantly low, I don't see any GC pressure anymore.

So I have few questions

  1. Am I doing anything inherently wrong while trying to reproduce very fast producer/slow consumer(s) scenario with TPL Dataflow building blocks
  2. Is there any way to make this work and not fail with OOM exception.
  3. Any comments/links on best practices how to handle this kind of scenarios (very fast producer/slow consumers) within TPL Dataflow context.
  4. My understanding of the problem is - since consumers can't keep up, BufferBlock's internal buffer is getting filled with messages very fast, and holds off to messages until some of the consumers come back to ask for next message as a result application runs out of memory (due to filled up internal buffer of BufferBlock) - would you agree with this ?

I'm using Microsoft.Tpl.Dataflow package -version 4.5.24. .NET 4.5 (C# 6). Process is 32 bit.

like image 630
Michael Avatar asked Oct 31 '16 10:10

Michael


2 Answers

You've identified the problem nicely: the BufferBlock is filling its input buffer until it hits OOM.

To solve this, you should add a BoundedCapacity option to your buffer block as well. This will throttle producers automatically for you (no need for the Thread.Sleep in your producer).

like image 190
Stephen Cleary Avatar answered Oct 16 '22 06:10

Stephen Cleary


There is a potentially serious problem with the code below:

for (int i = 0; i < int.MaxValue; i++)
{
    bufferBlock.SendAsync(GenerateItem()); // Don't do this!
}

The SendAsync method returns a Task, which can be a much heavier object memory-wise than the actual item you are sending to the block. In the specific example the returned task is always completed, because the BufferBlock has unbounded capacity, and so the memory footprint of the task is practically zero (the same cached Task<bool> instance is returned all the time). But after configuring the block with a small BoundedCapacity value, things will quickly get interesting (in an unpleasant way). Each call to SendAsync will soon start returning an incomplete Task, different each time, with a memory footprint of around 200 bytes per task (300 bytes if the CancellationToken parameter is also used). This will obviously not going to scale well.

The solution is to use the SendAsync the way it is intended to be used. Which means it should be awaited:

for (int i = 0; i < int.MaxValue; i++)
{
    await bufferBlock.SendAsync(GenerateItem()); // It's OK now
}

This way the producer will be asynchronously blocked until there is available room inside the block to accommodate the sent item. Which is hopefully what you want. Otherwise, if you don't want to block the producer, don't use the asynchronous SendAsync method, and use the synchronous Post method instead:

for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        bool accepted = bufferBlock.Post(item); // Synchronous call
        if (accepted) break; // Break the inner loop
        if (bufferBlock.Completion.IsCompleted) return; // Break both loops

        // Here do other things for a while, before retrying to post the item
    }
}

Alternatively you could use the lower lever OfferMessage method (instead of the Post or the SendAsync):

for (int i = 0; i < int.MaxValue; i++)
{
    var item = GenerateItem();
    while (true)
    {
        var offerResult = ((ITargetBlock<Item>)bufferBlock).OfferMessage(
            new DataflowMessageHeader(1L), item, null, false);
        if (offerResult == DataflowMessageStatus.Accepted) break;
        if (offerResult == DataflowMessageStatus.DecliningPermanently) return;

        // Here do other things for a while, before retrying to offer the item
    }
}

The magic number 1L is a value declared internally in the TPL Dataflow source code, denoting:

A well-known message ID for code that will send exactly one message, or where the exact message ID is not important.

like image 2
Theodor Zoulias Avatar answered Oct 16 '22 07:10

Theodor Zoulias