Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow block consumes all available memory

I have a TransformManyBlock with the following design:

  • Input: Path to a file
  • Output: IEnumerable of the file's contents, one line at a time

I am running this block on a huge file (61GB), which is too large to fit into RAM. In order to avoid unbounded memory growth, I have set BoundedCapacity to a very low value (e.g. 1) for this block, and all downstream blocks. Nonetheless, the block apparently iterates the IEnumerable greedily, which consumes all available memory on the computer, grinding every process to a halt. The OutputCount of the block continues to rise without bound until I kill the process.

What can I do to prevent the block from consuming the IEnumerable in this way?

EDIT: Here's an example program that illustrates the problem:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

If you're on a 64-bit box, make sure to clear the "Prefer 32-bit" option in Visual Studio. I have 16GB of RAM on my computer, and this program immediately consumes every available byte.

like image 993
Brian Berns Avatar asked Jun 23 '15 05:06

Brian Berns


People also ask

What is TPL Dataflow library?

It describes the programming model, the predefined dataflow block types, and how to configure dataflow blocks to meet the specific requirements of your applications. The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET.

Can TPL Dataflow accommodate back-pressure and load-shedding?

The producer can either decide to carry on at the same speed but discard some messages or slow down to the speed of Block 1. In this way TPL Dataflow can easily accomodate both back-pressure and load-shedding. However, not all blocks pause when the block it feeds becomes full.

What are the sources and targets of a TPL block?

Sources and Targets. The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.

How to create custom block types in TPL Dataflow library?

Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types.


1 Answers

You seem to misunderstand how TPL Dataflow works.

BoundedCapacity limits the amount of items you can post into a block. In your case that means a single char into the TransformManyBlock and single string into the ActionBlock.

So you post a single item to the TransformManyBlock which then returns 1024*1024 strings and tries to pass them on to the ActionBlock which will only accept a single one at a time. The rest of the strings will just sit there in the TransformManyBlock's output queue.

What you probably want to do is create a single block and post items into it in a streaming fashion by waiting (synchronously or otherwise) when it's capacity is reached:

private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}
like image 152
i3arnon Avatar answered Sep 29 '22 13:09

i3arnon