Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make fast producer paused when consumer is overwhelmed?

I have producer / consumer pattern in my app implemented used TPL Dataflow. I have the big dataflow mesh with about 40 blocks in it. There are two main functional parts in the mesh: producer part and consumer part. Producer supposed to continuosly provide a lot of work for consumer while consumer handling incoming work slowly sometimes. I want to pause producer when consumer is busy with some specified amount of work items. Otherwise the app consumes a lot of memory / CPU and behaves unsustainable.

I made demo app that demonstrates the issue:

mesh

using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var options = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false
            };

            var boundedOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                BoundedCapacity = 5
            };

            var bufferBlock = new BufferBlock<int>(boundedOptions);
            var producerBlock = new TransformBlock<int, int>(x => x + 1, options);
            var broadcastBlock = new BroadcastBlock<int>(x => x, options);

            var consumerBlock = new ActionBlock<int>(async x =>
            {
                var delay = 1000;
                if (x > 10) delay = 5000;

                await Task.Delay(delay);

                Console.WriteLine(x);
            }, boundedOptions);

            producerBlock.LinkTo(bufferBlock);
            bufferBlock.LinkTo(broadcastBlock);
            broadcastBlock.LinkTo(producerBlock);
            broadcastBlock.LinkTo(consumerBlock);

            bufferBlock.Post(1);

            consumerBlock.Completion.Wait();            
        }        
    }
}

The app prints something like this:

2
1
3
4
5
69055
69053
69054
69057
438028
438040
142303
438079

That means the producer keeps spinning and pushing messages to consumer. I want it to pause and wait until the consumer have finished current portion of work and then the producer should continue providing messages for consumer.

My question is quote similar to other question but it wasn't answered properly. I tried that solution and it doesn't work here allowing producer to flood the consumer with messages. Also setting BoundedCapacity doesn't work too.

The only solution I guess so far is make my own block that will monitor target block queue and acts according to target block's queue. But I hope it is kind of overkill for this issue.

like image 344
kseen Avatar asked Aug 13 '16 17:08

kseen


1 Answers

If you need to keep the producer → buffer → broadcast cycle intact, then you need to replace the broadcast block with some other block that still broadcasts messages it receives, but waits when one of its targets is full.

As long as you know the targets of that block when you're creating it, you can build it using ActionBlock (code copied from another answer of mine):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    DataflowBlockOptions options, params ITargetBlock<T>[] targets)
{
    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targets)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targets)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

Using this, you can declare the broadcast block:

var broadcastBlock = CreateGuaranteedBroadcastBlock(
    boundedOptions, producerBlock, consumerBlock);

(You will also need to remove the LinkTo lines that link from broadcastBlock.)

One issue with your original code that this does not fix is completion, but that's a hard problem in TPL Dataflow with cycles in general.

like image 168
svick Avatar answered Nov 08 '22 18:11

svick