Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow: Flatten incoming collection to sequential items

I'm building an application using TPL dataflow. Actually I've the following problem. I've one transformblock var tfb1 = new TranformBlock<InMsg, IReadOnlyCollection<OutMsg>>. So tfb1 receives on in message and creates a list of out-messages. This list of out-messages shall be linked to a router datablock, which receives OutMsg as input (and not IReadOnlyCollection<OutMsg>).

How can I flatten the IReadOnlyCollection so that the containing message can be used as input for e.g. a transform block in the form of TransformBlock<OutMsg, SomeOtherType>. Is it possible via LinkTo()?

Thx

like image 294
Moerwald Avatar asked Jul 19 '17 12:07

Moerwald


People also ask

How do the TPL Dataflow blocks work?

Here’s a high-level view of how the TPL Dataflow blocks operate: 1 Each block receives and buffers data from one or more sources, including other blocks, in the form of messages. When... 2 The output from the component (block) is then passed to the next linked block, and to the next one, if any, and so... More ...

What is compositionality in TPL Dataflow?

Compositionality is the main strength of TPL DataFlow, because a set of independent containers, known as blocks, is designed to be combined. These blocks can be a chain of different tasks that constitute a parallel workflow, and are easily swapped, reordered, reused, or even removed.

What are the sources and targets of a TPL block?

Sources and Targets. The TPL Dataflow Library consists of dataflow blocks, which are data structures that buffer and process data. The TPL defines three kinds of dataflow blocks: source blocks, target blocks, and propagator blocks. A source block acts as a source of data and can be read from.

How do I install the TPL Dataflow library in Visual Studio?

The TPL Dataflow Library (the System.Threading.Tasks.Dataflow namespace) is not distributed with .NET. To install the System.Threading.Tasks.Dataflow namespace in Visual Studio, open your project, choose Manage NuGet Packages from the Project menu, and search online for the System.Threading.Tasks.Dataflow package.


1 Answers

You can use the TransformManyBlock instead of TransformMany to flatten any IEnumerable<T> result, eg:

var flattenBlock = new TransformManyBlock<InMsg,OutMsg>(msg=>{
    List<OutMsg> myResultList;
    //Calculate some results
    return myResultList;
});

This will pass individual OutMsg instances to the next block.

You can use an iterator so individual messages are propagated immediatelly :

  var flattenBlock = new TransformManyBlock<InMsg,OutMsg>(msg=>{
    //Calculate some results
    foreach(var item in someDbResults)
    {
        yield return item;
    }
});

Or you can just flatten the input :

var flattenBlock = new TransformManyBlock<IEnumerable<OutMsg>,OutMsg>(items=>items);

You can link this block to any block that accepts OutMsg :

var block = new ActionBlock<OutMsg>(...);

flattenBlock.LinkTo(block);

You can route messages by passing a predicate to LinkTo. For example, if you want to route failure messages to a logging block, you can type:

flattenBlock.LinkTo(logBlock,msg=>msg.HasError);
flattenBlock.LinkTo(happyBlock);

Messages that don't match any predicate will get stuck in the output buffer and prevent the block from completing. By having one link without predicate you ensure that all messages will be processed

like image 136
Panagiotis Kanavos Avatar answered Sep 19 '22 10:09

Panagiotis Kanavos