Using TPL.DataFlow blocks, is it possible to link two or more sources to a single ITargetBlock(e.g. ActionBlock) and prioritize the sources?
e.g.
BufferBlock<string> b1 = new ...
BufferBlock<string> b2 = new ...
ActionBlock<string> a = new ...
//somehow force messages in b1 to be processed before any message of b2, always
b1.LinkTo (a);
b2.LinkTo (a);
As long as there are messages in b1, I want those to be fed to "a" and once b1 is empty, b2 messages are beeing pushed into "a"
Ideas?
There is nothing like that in TPL Dataflow itself.
The simplest way I can imagine doing this by yourself would be to create a structure that encapsulates three blocks: high priority input, low priority input and output. Those blocks would be simple BufferBlock
s, along with a method forwarding messages from the two inputs to the output based on priority, running in background.
The code could look like this:
public class PriorityBlock<T>
{
private readonly BufferBlock<T> highPriorityTarget;
public ITargetBlock<T> HighPriorityTarget
{
get { return highPriorityTarget; }
}
private readonly BufferBlock<T> lowPriorityTarget;
public ITargetBlock<T> LowPriorityTarget
{
get { return lowPriorityTarget; }
}
private readonly BufferBlock<T> source;
public ISourceBlock<T> Source
{
get { return source; }
}
public PriorityBlock()
{
var options = new DataflowBlockOptions { BoundedCapacity = 1 };
highPriorityTarget = new BufferBlock<T>(options);
lowPriorityTarget = new BufferBlock<T>(options);
source = new BufferBlock<T>(options);
Task.Run(() => ForwardMessages());
}
private async Task ForwardMessages()
{
while (true)
{
await Task.WhenAny(
highPriorityTarget.OutputAvailableAsync(),
lowPriorityTarget.OutputAvailableAsync());
T item;
if (highPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else if (lowPriorityTarget.TryReceive(out item))
{
await source.SendAsync(item);
}
else
{
// both input blocks must be completed
source.Complete();
return;
}
}
}
}
Usage would look like this:
b1.LinkTo(priorityBlock.HighPriorityTarget);
b2.LinkTo(priorityBlock.LowPriorityTarget);
priorityBlock.Source.LinkTo(a);
For this to work, a
also has to have BoundingCapacity
set to one (or at least a very low number).
The caveat with this code is that it can introduce latency of two messages (one waiting in the output block, one waiting in SendAsync()
). So, if you have a long list of low priority messages and suddenly a high priority message comes in, it will be processed only after those two low-priority messages that are already waiting.
If this is a problem for you, it can be solved. But I believe it would require more complicated code, that deals with the less public parts of TPL Dataflow, like OfferMessage()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With