Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement continuously running dataflow blocks in TPL?

I have producer/consumer dataflow block set-up using BufferBlock and ActionBlock and it is working fine inside Console application;

After adding all items into BurfferBlock and Linking BufferBlock with other Action Items; it is working good.

now I want to use that inside service where this dataflow block pipeline will always be up and and when messages will be available through external events it would go inside bufferblock and that will start processing. How can I achieve this?

So far I have done below:

public void SetupPipeline()
{
    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      {
          CancellationToken = CancellationToken
      });

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    {
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    }, new ExecutionDataflowBlockOptions
    {
        CancellationToken = CancellationToken
    });

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);
}
like image 371
user2757350 Avatar asked Dec 02 '13 19:12

user2757350


2 Answers

Your batch size is defined by the variable 'BoundingCapacity' in the batchblock constructor. A batch will be posted when:

  • A number of posts equal to the batch size have been received (specified in constructor)
  • The batch block is marked for completion
  • The triggerbatch method is called

It seems like you want a batch to post when the bath size is met or a timeout occurs. If this is the case, and if batch size is not critical, I would really just add a recurring interval to the timer you have and make the object downstream of the batchblock ignore empty posts.

What you may actually want, and what is most inline with the philosophy of dataflow programming, is to create a new batch block when you begin posting a series of items and then completing it when done or when a timeout occurs. New posts would create a new batchblock if one does not already exist.

The problem with trying to implement a timeout timer around the batchblock that only fires based on the first trigger is that you will either need to count and verify posts to bufferblock or you will need to watch posts from bufferblock. Both of these scenarios will create a lot of ugliness and/or violate block encapsulation.

like image 143
VoteCoffee Avatar answered Oct 17 '22 09:10

VoteCoffee


As a gross oversimplification, DataFlow is a way to process a bunch of objects using a set of methods. It doesn't provide or expect any specific way of creating these objects.

If you want a pipeline to stay alive, just don't terminate the application. If you don't want to use a Console application, create a service that builds the pipeline and sends objects to it until it closes.

Messages are just objects that you will create by reading data, in response to events (whatever that means) or any other way.

As for external events, what do you mean by that? That someone will send data to your application? There are many ways this can happen:

  • If the data comes from another console application, you can pipe the results of one application to the other, parse data coming from the input stream of your command-line application, create messages and pass them to the pipeline
  • If you want a service listening for requests, you can host a .NET Pipe, WCF or Web API service to listen for calls and pass the posted data to the pipeline.
  • If the data comes from a database, you may be able to poll for changes and send any changed data to the pipeline.

The point is, Dataflow is about processing data, not about listening to events. It's not a full blown distributed agent system, if that's what you were looking for.

like image 2
Panagiotis Kanavos Avatar answered Oct 17 '22 07:10

Panagiotis Kanavos