I've created something similar to a web crawler to create a report of the 1000+ Webservices I need to manage. I therefore created a TPL Dataflow Pipeline to manage getting and processing the data. The Pipeline I imagined looks a little bit like this (sorry for my paint skills :D):
I already created a implementation and everything worked fine until I started my Pipeline as a whole. I gave 500 objects into the Pipeline as a Input into the Pipeline and expected that the programm would run a little while but the Programm stopped execution after moving to the Execution Block. After checking the flow of the Programm it seemed to me that the Completion propagated to fast to the Dispose Block. I created a small sample Project with the same Pipeline to check if it was my Implementation of the Input classes or the Pipeline itself. The Sample code is this:
public class Job
{
public int Ticker { get; set; }
public Type Type { get; }
public Job(Type type)
{
Type = type;
}
public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}
public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}
public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}
public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}
public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}
public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}
This is my Input class that is entered into the Preparation Block.
public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;
private BufferBlock<Job> _balancerBlock;
private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};
private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
private TransformBlock<Job, Job> _typeATickBlock;
private TransformBlock<Job, Job> _typeBTickBlock;
private TransformBlock<Job, Job> _writeBlock;
private TransformBlock<Job, Job> _intermediateCleanupBlock;
private ActionBlock<Job> _finalCleanupBlock;
public async Task Process()
{
CreateBlocks();
ConfigureBlocks();
for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();
await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}
private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);
_balancerBlock = new BufferBlock<Job>(_options);
_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);
_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);
_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);
_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);
_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);
_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}
this is my Dataflow Pipeline representing my "artwork" above :D. All of this is executed in my Scheduler that is started in the Programm.cs:
public class Scheduler
{
private readonly Timer _timer;
private readonly Dataflow _flow;
public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}
public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}
private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}
class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();
Console.ReadKey();
}
}
The Console Output i am getting is: Timer stopped Preparing Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Ticking Trying to commit Trying to finish Trying to commit Trying to finish
It seems like the Programm has stopped working at that point because I am not hitting any Breakpoints or getting any further. I think all my Blocks already received a Completion Signal and therefore stop taking any new Items. Therefore my Question is: How do I manage the Completion signal so that the pipeline only finishes when there is no more work to do?
The main issue with your flow is the feedback loop to your tick block. This causes two problems.
First: Back Pressure
When _typeATickBlock
is linked back on itself it will stop accepting all messages once it has reached its capacity. In your case 4, that means once it has 3 messages in the output buffer and one being processed, it will stop accepting and passing messages. You can see this by adding the following line to the block:
Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");
And will output:
Tick Block 0/3
To fix this you can add any buffering block, Buffer or Transform. The key will be the bounded capacity of the buffer. In your case every single message will need to be rerouted back to the tick block. With that you know that your capacity needs to match the volume of messages at any given time. In this case 500.
_printingBuffer = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });
In your real code you may not know the value and Unbounded
may be your best option to avoid locking your pipeline but you can tune this value given your incoming volume.
Second: Completion Flow
With a feedback loop in your pipeline completion propagation becomes more difficult than simply setting the link options. Once completion hits the tick block it stops accepting all messages, even the ones that still need to be processed. To avoid this you need hold propagation until all messages have passed the loop. First you stop propagation just before the tick block and then check the buffers on each block that participates in the loop. Then once all buffers are empty propagate completion, and fault, to the block.
_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});
Last
Your complete ConfigureBlocks
with completion setup and the buffer inserted should look something like this. Note I'm only passing complete and not fault here and I removed the type B branch.
private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);
_balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);
_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});
_typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
_printingBuffer.LinkTo(_typeATickBlock);
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());
_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());
_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
I wrote a blog post a while back, blog is no longer active, about handling completion with feedback loops. It may provide some more help. Retrieved from WayBackMachine.
Finding Completion in a Complex Flow: Feedback Loops
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With