I have producer/consumer dataflow block set-up using BufferBlock and ActionBlock and it is working fine inside Console application;
After adding all items into BurfferBlock and Linking BufferBlock with other Action Items; it is working good.
now I want to use that inside service where this dataflow block pipeline will always be up and and when messages will be available through external events it would go inside bufferblock and that will start processing. How can I achieve this?
So far I have done below:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}
Your batch size is defined by the variable 'BoundingCapacity' in the batchblock constructor. A batch will be posted when:
It seems like you want a batch to post when the bath size is met or a timeout occurs. If this is the case, and if batch size is not critical, I would really just add a recurring interval to the timer you have and make the object downstream of the batchblock ignore empty posts.
What you may actually want, and what is most inline with the philosophy of dataflow programming, is to create a new batch block when you begin posting a series of items and then completing it when done or when a timeout occurs. New posts would create a new batchblock if one does not already exist.
The problem with trying to implement a timeout timer around the batchblock that only fires based on the first trigger is that you will either need to count and verify posts to bufferblock or you will need to watch posts from bufferblock. Both of these scenarios will create a lot of ugliness and/or violate block encapsulation.
As a gross oversimplification, DataFlow is a way to process a bunch of objects using a set of methods. It doesn't provide or expect any specific way of creating these objects.
If you want a pipeline to stay alive, just don't terminate the application. If you don't want to use a Console application, create a service that builds the pipeline and sends objects to it until it closes.
Messages are just objects that you will create by reading data, in response to events (whatever that means) or any other way.
As for external events, what do you mean by that? That someone will send data to your application? There are many ways this can happen:
The point is, Dataflow is about processing data, not about listening to events. It's not a full blown distributed agent system, if that's what you were looking for.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With