I would be glad for some input on the following implementation of a BroadcastCopyBlock
in TPL Dataflow, which copies a received message to all consumers, that registered to the BroadcastCopyBlock
and guarantees delivery to all consumers, which are linked to the block at the time it receives the message. (Unlike the BroadcastBlock
which does not guarntee delivery of messages, if the next one comes in, before the former message has been delivered to all consumers).
My main concern is the reserving of messages and releasing of reservations. What would happen, if a receiving block decides to not handle the message? My understanding is, this would create a memory leak, since the message would be kept indefinitely. I'm thinking, that I should somehow mark the message as unused, but I'm not sure, how. I was thinking about some artificial message sink (an ActionBlock
with no action), or can I just mark a message as discarded?
Further Input on the implementation is also appreciated.
This is probably almost a duplicate of the following question, but I would prefer to use my own class, instead of a method to create the block. Or would that be considered bad style?
BroadcastBlock with Guaranteed Delivery in TPL Dataflow
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
TL/DR
Your implementation uses the Post
method inside the ActionBlock
, which still will lose the data if target rejects the message, switch to the SendAsync
one, and, probably, you don't need to implenment all these methods, you need only ITargetBlock<in TInput>
interface implementation.
I want to clarify something before coming back to your main question. I think that you are confused by some options from TPL Dataflow
library, and I want explain them a bit here. The behavior you're saying The first consumer, which receives the message, deletes it from the queue
is not about the BroadcastBlock
, it is about the multiple consumers linked for an ISourceBlock
, like BufferBlock
:
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
What the BroadcastBlock
do is exactly what are you talking about, consider this code:
private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
The output will be
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
However, this can be done only is the speed of incoming data is less than the speed of processing the data, because in other case your memory will end up quickly because of buffers grow, as you stated in your question. Let's see what will happen if we use the ExecutionDataflowBlockOptions
for limit the incoming data buffer for a slow block:
private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
The output will be
FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
As you can see, our slow block lost the last message, which is not what we are looking for. The reason for this is that the BroadcastBlock
, by default, uses the Post
method to deliver messages. According official Intro Document:
- Post
- An extension method that asynchronously posts to the target block. It returns immediately whether the data could be accepted or not, and it does not allow for the target to consume the message at a later time.
- SendAsync
- An extension method that asynchronously sends to target blocks while supporting buffering. A
Post
operation on a target is asynchronous, but if a target wants to postpone the offered data, there is nowhere for the data to be buffered and the target must instead be forced to decline.SendAsync
enables asynchronous posting of the data with buffering, such that if a target postpones, it will later be able to retrieve the postponed data from the temporary buffer used for this one asynchronously posted message.
So, this method could help us in our mission, let's introduce some wrapper ActionBlock
, which do exactly what we want - SendAsync
the data for our real processors:
private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
The output will be
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
But this waiting will never end - our basic wrapper does not propagate the completion for linked blocks, and the ActionBlock
can't be linked to anything. We can try to wait for an wrapper completion:
private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}
The output will be
FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
Which is definitely not what we wanted - ActionBlock
finished all the job, and the posting for a last message wouldn't be awaited for. Moreover, we don't even see the second message because we exit the method before Sleep
method ends! So you definitely need your own implementation for this.
Now, at last, some thoughts about your code:
ITargetBlock<in TInput>
, so implement only that interface.Post
method inside the ActionBlock
, which, as we saw, could lead to data loss in case of some problems on consumer's side. Consider SendAsync
method instead.Completion
task actually reverses the order of your dataflow - you are waiting for targets to complete, which, as I think, is not good practice - you probably should create an ending block for your dataflow (this could be even NullTarget
block, which simply synchronously drops the incoming message), and wait for it to be completed.I just want to add to VMAtm's excellent answer that in BoundedWrapperInfiniteCase
, you can manually propagate the completion. Add the following lines before the call to broadcast.SendAsync()
then wait for both actions to complete to make the action wrappers complete the inner actions:
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
e.g.
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
for (var i = 0; i < 3; ++i)
broadcast.SendAsync(i);
broadcast.Complete();
// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);
The output will be the same as in VMAtm's answer, but all tasks will properly complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With