Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow create aggregated result array from all incoming nodes (multiple producer, 1 consumer)

Please note following code sample. I need an aggregator node, that can be linked to any number of sources, waites for all sources to send one message and then combines those in a result[].

This should be obvious and straigt forward, but somehow I do not find a solution. I checked JoinBlock and TransformaterBlock, but both seem unfitting.

using System;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly uint _produceCount = 0;
        private static void Main(string[] args)
        {

            BufferBlock<string> p1 = new BufferBlock<string>();
            BufferBlock<string> p2 = new BufferBlock<string>();

            // a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs

            ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
            {
                Console.WriteLine(String.Join(',', inputs));
            });

            p1.Post("Produce 1.1");
            p2.Post("Produce 2.1");

            // desired output:
            // "Produce 1.1, Produce 2.1"
            // actually the order is of no importance at this time

        }


    }
}

[Edit] Further clarification: I would like to have a block that: - dynamically await all source-notes (at the point in time the first message arrives) to complete and aggregate the result to pass to follower nodes

like image 256
Martin Meeser Avatar asked Oct 19 '25 13:10

Martin Meeser


2 Answers

You can use a non-greedy BatchBlock for this. By being non-greedy each source will contribute one item to the batch. This was originally suggested here. And here's a tested example: Note as proof, source1 is sent multiple items that don't show up in the batch:

public class DataAggregator
{
    private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
    private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
    private BufferBlock<string> source1 = new BufferBlock<string>();
    private BufferBlock<string> source2 = new BufferBlock<string>();
    private BufferBlock<string> source3 = new BufferBlock<string>();
    private BufferBlock<string> source4 = new BufferBlock<string>();
    private BufferBlock<string> source5 = new BufferBlock<string>();

    public DataAggregator()
    {
        source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    [Test]
    public async Task TestPipeline()
    {
        source1.Post("string1-1");
        source1.Post("string1-2");
        source1.Post("string1-3");
        source2.Post("string2-1");
        source3.Post("string3-1");
        source4.Post("string4-1");
        source5.Post("string5-1");
        //Should print string1-1 string2-1 string3-1 string4-1 string5-1
        source1.Complete();
        source2.Complete();
        source3.Complete();
        source4.Complete();
        source5.Complete();
        await writer.Completion;
    }
}

Output:

string1-1
string2-1
string3-1
string4-1
string5-1
like image 109
JSteward Avatar answered Oct 21 '25 03:10

JSteward


If you know your sources beforehand, I'd use a JoinBlock together with a TransformBlock. You would have to create a BufferBlock for each source.

First, the JoinBlock waits for one message from each source and packs them in one tuple. Then the TransformBlock creates a result array from the intermediate tuple.

If you do not know your sources beforehand, you need to explain how you expect your new block to know when to produce a result. That logic should then be put into a custom block, probably in the form of a TransformManyBlock<string,string[]>.

If you want to join a dynamic number of sources, you can create an unlimited join block like this:

private static void Main()
{
    var source1 = new BufferBlock<string>();
    var source2 = new BufferBlock<string>();
    var source3 = new BufferBlock<string>();
    var aggregator = CreateAggregatorBlock( 3 );
    var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
    source1.LinkTo( aggregator );
    source2.LinkTo( aggregator );
    source3.LinkTo( aggregator );
    aggregator.LinkTo( result );

    source1.Post( "message 1" );
    source2.Post( "message 2" );
    source3.Post( "message 3" );

    Console.ReadLine();
}

private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
    var buffer = new List<string>();
    return new TransformManyBlock<string, string[]>( message => {
        buffer.Add( message );
        if( buffer.Count == sources )
        {
            var result = buffer.ToArray();
            buffer.Clear();
            return new[] {result};
        }
        return Enumerable.Empty<string[]>();
    } );
}

This assumes your sources produce messages at the same rate. If that's not the case, you need to but the identity of the source next to the message and have a buffer for each source.

like image 44
Haukinger Avatar answered Oct 21 '25 01:10

Haukinger



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!