Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow, alternative to JoinBlock limitations?

I look for an alternative to JoinBlock which can be linked to by n-TransformBlocks and join/merge messages of all TransformBlock source blocks together in order to pass a collection of such on to another data flow block.

JoinBlock does the job fine but it is limited to hooking up to 3 source blocks. It also suffers from quite a number inefficiencies (very slow to join even value types (ints) of 2 source blocks). Is there a way to have Tasks returned from the TransformBlocks and wait until all TransformBlocks have a completed task to pass on before accepting the Task<item>?

Any alternative ideas? I potentially have 1-20 such transform blocks which items I need to join together before passing on the joined item collection. Each transform block is guaranteed to return exactly one output item for each input item "transformed".

Edit: Requested clarification:

Per one of my previous questions, I set up my JoinBlocks as follows:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}
like image 494
Matt Avatar asked Dec 02 '12 08:12

Matt


People also ask

Why use TPL DataFlow?

The Task Parallel Library (TPL) provides dataflow components to help increase the robustness of concurrency-enabled applications.

What is ActionBlock?

One of them is ActionBlock. This class is part of the DataFlow class. This class is used to create a process. Not in our discussion, but as far as you can specify a flow that will eventually run Concurrency and Async. With the ActionBlock class, you can specify a task and specify its synchronization settings.


1 Answers

One way to do this is to use BatchBlock with Greedy set to false. In this configuration, the block doesn't do anything until there are n items from n different blocks waiting for it to be consumed (where n is the number you set when creating the BatchBlock). When that happens, it consumes all n items at once and produces an array containing all of the items.

One caveat with this solution is that the resulting array is not sorted: you're not going to know which item came from which source. And I have no idea how does its performance compare with JoinBlock, you'll have to test that by yourself. (Though I would understand if using BatchBlock this way was slower, because of the overhead necessary for non-greedy consumption.)

like image 200
svick Avatar answered Sep 24 '22 10:09

svick