I have producer / consumer pattern in my app implemented used TPL Dataflow. I have the big dataflow mesh with about 40 blocks in it. There are two main functional parts in the mesh: producer part and consumer part. Producer supposed to continuosly provide a lot of work for consumer while consumer handling incoming work slowly sometimes. I want to pause producer when consumer is busy with some specified amount of work items. Otherwise the app consumes a lot of memory / CPU and behaves unsustainable.
I made demo app that demonstrates the issue:
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
static void Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false
};
var boundedOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
EnsureOrdered = false,
BoundedCapacity = 5
};
var bufferBlock = new BufferBlock<int>(boundedOptions);
var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
var broadcastBlock = new BroadcastBlock<int>(x => x, options);
var consumerBlock = new ActionBlock<int>(async x =>
{
var delay = 1000;
if (x > 10) delay = 5000;
await Task.Delay(delay);
Console.WriteLine(x);
}, boundedOptions);
producerBlock.LinkTo(bufferBlock);
bufferBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(producerBlock);
broadcastBlock.LinkTo(consumerBlock);
bufferBlock.Post(1);
consumerBlock.Completion.Wait();
}
}
}
The app prints something like this:
2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079
That means the producer keeps spinning and pushing messages to consumer. I want it to pause and wait until the consumer have finished current portion of work and then the producer should continue providing messages for consumer.
My question is quote similar to other question but it wasn't answered properly. I tried that solution and it doesn't work here allowing producer to flood the consumer with messages. Also setting BoundedCapacity
doesn't work too.
The only solution I guess so far is make my own block that will monitor target block queue and acts according to target block's queue. But I hope it is kind of overkill for this issue.
If you need to keep the producer → buffer → broadcast cycle intact, then you need to replace the broadcast block with some other block that still broadcasts messages it receives, but waits when one of its targets is full.
As long as you know the targets of that block when you're creating it, you can build it using ActionBlock
(code copied from another answer of mine):
public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
DataflowBlockOptions options, params ITargetBlock<T>[] targets)
{
var block = new ActionBlock<T>(
async item =>
{
foreach (var target in targets)
{
await target.SendAsync(item);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken
});
block.Completion.ContinueWith(task =>
{
foreach (var target in targets)
{
if (task.Exception != null)
target.Fault(task.Exception);
else
target.Complete();
}
});
return block;
}
Using this, you can declare the broadcast block:
var broadcastBlock = CreateGuaranteedBroadcastBlock(
boundedOptions, producerBlock, consumerBlock);
(You will also need to remove the LinkTo
lines that link from broadcastBlock
.)
One issue with your original code that this does not fix is completion, but that's a hard problem in TPL Dataflow with cycles in general.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With