I am creating a Task processor which uses TPL DataFlow. I will follow a producer consumer model where in Producer produces some items to be processed once in a while and consumers keep waiting for new items to arrive. Here is my code:
async Task Main()
{
var runner = new Runner();
CancellationTokenSource cts = new CancellationTokenSource();
Task runnerTask = runner.ExecuteAsync(cts.Token);
await Task.WhenAll(runnerTask);
}
public class Runner
{
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
}
private int[] GetItems(int count)
{
Random randNum = new Random();
int[] arr = new int[count];
for (int i = 0; i < count; i++)
{
arr[i] = randNum.Next(10, 20);
}
return arr;
}
}
public class ActionMeshProcessor
{
private TransformBlock<int, int> Transformer { get; set; }
private ActionBlock<int> CompletionAnnouncer { get; set; }
public async Task Init(CancellationToken cancellationToken)
{
var options = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 5,
BoundedCapacity = 5
};
this.Transformer = new TransformBlock<int, int>(async input => {
await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
if (input > 15)
{
throw new Exception($"I can't handle this number: {input}");
}
return input + 1;
}, options);
this.CompletionAnnouncer = new ActionBlock<int>(async input =>
{
Console.WriteLine($"Completed: {input}");
await Task.FromResult(0);
}, options);
this.Transformer.LinkTo(this.CompletionAnnouncer);
await Task.FromResult(0); // what do I await here?
}
public async Task ProcessBlockAsync(int[] arr)
{
foreach (var item in arr)
{
await this.Transformer.SendAsync(item); // await if there are no free slots
}
}
}
I added a condition check above to throw an exception to mimic an exceptional case.
Here are my questions:
What is the best way I can handle exceptions in the above mesh without bringing the whole mesh down?
Is there a better way to initialize/start/continue a never ending DataFlow mesh?
Where do I await Completion?
I have looked in to this similar question
Exceptions
There's nothing asynchronous in your init
it could be a standard synchronous constructor. You can handle exceptions in your mesh without taking the mesh down with a simple try catch in the lamda you provide to the block. You can then handle that case by either filtering the result from your mesh or ignoring the result in the following blocks. Below is an example of filtering. For the simple case of an int
you can use an int?
and filter out any value that was null
or of course you could set any type of magic indicator value if you like. If your actually passing around a reference type you can either push out null or mark the data item as dirty in way that can be examined by the predicate on your link.
public class ActionMeshProcessor {
private TransformBlock<int, int?> Transformer { get; set; }
private ActionBlock<int?> CompletionAnnouncer { get; set; }
public ActionMeshProcessor(CancellationToken cancellationToken) {
var options = new ExecutionDataflowBlockOptions {
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = 5,
BoundedCapacity = 5
};
this.Transformer = new TransformBlock<int, int?>(async input => {
try {
await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!
if (input > 15) {
throw new Exception($"I can't handle this number: {input}");
}
return input + 1;
} catch (Exception ex) {
return null;
}
}, options);
this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
{
if (input == null) throw new ArgumentNullException("input");
Console.WriteLine($"Completed: {input}");
await Task.FromResult(0);
}, options);
//Filtering
this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}
public async Task ProcessBlockAsync(int[] arr) {
foreach (var item in arr) {
await this.Transformer.SendAsync(item); // await if there are no free slots
}
}
}
Completion
You can expose Complete()
and Completion
from your processor and use those to await
the completion when your app shutsdown, assuming thats the only time you'd shutdown the mesh. Also, make sure you propagate completion through your links properly.
//Filtering
this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}
public void Complete() {
Transformer.Complete();
}
public Task Completion {
get { return CompletionAnnouncer.Completion; }
}
Then, based on your sample the most likely place for completion is outside the loop driving your processing:
public async Task ExecuteAsync(CancellationToken cancellationToken) {
var random = new Random();
ActionMeshProcessor processor = new ActionMeshProcessor();
await processor.Init(cancellationToken);
while (!cancellationToken.IsCancellationRequested) {
await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more
int[] items = GetItems(random.Next(3, 7));
await processor.ProcessBlockAsync(items);
}
//asuming you don't intend to throw from cancellation
processor.Complete();
await processor.Completion();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With