I have a TransformManyBlock
with the following design:
I am running this block on a huge file (61GB), which is too large to fit into RAM. In order to avoid unbounded memory growth, I have set BoundedCapacity
to a very low value (e.g. 1) for this block, and all downstream blocks. Nonetheless, the block apparently iterates the IEnumerable greedily, which consumes all available memory on the computer, grinding every process to a halt. The OutputCount of the block continues to rise without bound until I kill the process.
What can I do to prevent the block from consuming the IEnumerable
in this way?
EDIT: Here's an example program that illustrates the problem:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static IEnumerable<string> GetSequence(char c)
{
for (var i = 0; i < 1024 * 1024; ++i)
yield return new string(c, 1024 * 1024);
}
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
var secondBlock = new ActionBlock<string>(str =>
{
Console.WriteLine(str.Substring(0, 10));
Thread.Sleep(1000);
}, options);
firstBlock.LinkTo(secondBlock);
firstBlock.Completion.ContinueWith(task =>
{
if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
else secondBlock.Complete();
});
firstBlock.Post('A');
firstBlock.Complete();
for (; ; )
{
Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
Thread.Sleep(3000);
}
}
}
If you're on a 64-bit box, make sure to clear the "Prefer 32-bit" option in Visual Studio. I have 16GB of RAM on my computer, and this program immediately consumes every available byte.
It describes the programming model, the predefined dataflow block types, and how to configure dataflow blocks to meet the specific requirements of your applications. The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET.
The producer can either decide to carry on at the same speed but discard some messages or slow down to the speed of Block 1. In this way TPL Dataflow can easily accomodate both back-pressure and load-shedding. However, not all blocks pause when the block it feeds becomes full.
Sources and Targets. The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.
Although the TPL Dataflow Library provides many predefined block types, you can create additional block types that perform custom behavior. Implement the ISourceBlock<TOutput> or ITargetBlock<TInput> interfaces directly or use the Encapsulate method to build a complex block that encapsulates the behavior of existing block types.
You seem to misunderstand how TPL Dataflow works.
BoundedCapacity
limits the amount of items you can post into a block. In your case that means a single char
into the TransformManyBlock
and single string
into the ActionBlock
.
So you post a single item to the TransformManyBlock
which then returns 1024*1024
strings and tries to pass them on to the ActionBlock
which will only accept a single one at a time. The rest of the strings will just sit there in the TransformManyBlock
's output queue.
What you probably want to do is create a single block and post items into it in a streaming fashion by waiting (synchronously or otherwise) when it's capacity is reached:
private static void Main()
{
MainAsync().Wait();
}
private static async Task MainAsync()
{
var block = new ActionBlock<string>(async item =>
{
Console.WriteLine(item.Substring(0, 10));
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
foreach (var item in GetSequence('A'))
{
await block.SendAsync(item);
}
block.Complete();
await block.Completion;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With