I am attempting to use TPL Dataflow to create a pipeline. All is working fine so far, with my pipeline defined as follows (although my issue is just with broadcaster, submissionSucceeded, submissionFailed):
// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));
// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);
The issue I have is with the propagation of Exceptions. Because my BroadcastBlock propagates its completion (and therefore any Fault) to two blocks, if an exception does occur, it gets propagated to both blocks. Thus when I do
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
I end up with an aggregate exception containing two exceptions. Right now the best I can do is to filter these, i.e.:
try
{
Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}
but I'm wondering if there's a better way to do this. i.e. if only one exception occurs, I only want one exception raised. I'm new to Dataflow, so just discovering all the conventions.
I've written a TPL DataFlow example (https://github.com/squideyes/PodFetch) that takes a slightly different approach to completion and error handling. Here's the relevant code from Line's 171 to 201 of Program.cs:
scraper.LinkTo(fetcher, link => link != null);
scraper.LinkTo(DataflowBlock.NullTarget<Link>());
scraper.HandleCompletion(fetcher);
Status.Info.Log("Fetching APOD's archive list");
links.ForEach(link => scraper.Post(link));
scraper.Complete();
try
{
await fetcher.Completion;
Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
}
catch (AggregateException errors)
{
foreach (var error in errors.InnerExceptions)
Status.Failure.Log(error.Message);
}
catch (TaskCanceledException)
{
Status.Cancelled.Log("The process was manually cancelled!");
}
catch (Exception error)
{
Status.Failure.Log(error.Message);
}
As you can see, I link a couple of TPL blocks together then get primed for handling completion using a HandleCompletion extension method:
public static void HandleCompletion(
this IDataflowBlock source, params IDataflowBlock[] targets)
{
source.Completion.ContinueWith(
task =>
{
foreach (var target in targets)
{
if (task.IsFaulted)
target.Fault(task.Exception);
else
target.Complete();
}
});
}
Very importantly, I call scraper.Complete() when I'm done passing in objects to the first block in the chain. With that, the HandleCompletion extension method then deals with continuation. And, since I'm waiting on fetcher (the last block in the chain to complete), it's easy to catch any resulting errors within a try/catch.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With