I have the following pseudo code:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
{
await Task.Delay(500);
Trace.TraceInformation(
$"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
// handling some logic but it throws
if (item >= 5) throw new Exception("Something bad happened");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });
queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });
var targets = new List<ITargetBlock<int>> {queue};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
// This is condition is always false
// t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
if (!await t.SendAsync(item))
await t.Completion;
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception : " + e.Message);
}
});
try
{
// Neither here the exception is rethrowed
await Task.WhenAll(processingTasks);
}
catch (Exception e)
{
Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
}
});
for (var i = 1; i <= 10; i++)
{
broadcaster.Post(i);
}
The pipeline is configured like that ActionBlock<int> => BufferBlock<int> => ActionBlock<int>
.
The last ActionBlock<int>
throws an exception but it is not rethrown to source block where I would like to handle it.
How this code can be rewritten so it handles exceptions correctly?
You can find the official guidelines for this topic here. Overall solution is to subscribe for all the blocks Completion
task with checking the state of it, and, in case of need, replacing the faulted block (one should store all the references for the blocks too). Please refer to whole article to more information.
Behaviors of a network with
Faulted
blocks
Reserved Messages
In order to avoid message corruption, a faulted block should clear its message queues and move into aFaulted
state as soon as possible. There is a single scenario that does not obey to this rule: a source block holding a message reserved by a target. If a block that encounters an internal exception has a message that was reserved by a target, the reserved message must not be dropped, and the block should not be moved into theFaulted
state until the message is released or consumed.Hanging Networks
...
- Keep a reference to all the blocks in the network and use
Task.WaitAll
orTask.WhenAll
to wait for them (synchronously or asynchronously). If a block faults, itsCompletion
task will complete in theFaulted
state.- Use
DataflowLinkOptions
withPropagateCompletion == true
when building a linear network. That will propagate block completion from source to target. In this case it is enough to wait on the network leaf block.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With