Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process items in parallel and then merge the results?

I am confronted with the following problem:

I have a a data stream of Foo objects and stream those objects to several concurrent in-process tasks/threads which in turn process the objects and output FooResult objects. Each FooResult contains among othermembers the same Foo that was used in the creation of a FooResult. However, not every Foo necessarily creates a FooResult.

My problem is that I want to pass on from this whole process a wrapping object that contains the original Foo and potentially all, if any, FooResult objects that may have been created from a Foo within the concurrent tasks.

Note: I currently use TPL Dataflow, whereas each concurrent process happens within an ActionBlock<Foo> which is linked to from a BroadCastBlock<Foo>. It uses SendAsync() to a target dataflow block to send potentially created FooResult. Obviously the concurrent data flow blocks produce FooResult at unpredictable times which is what I currently struggle with. I do not seem to be able to figure out how many FooResult were created in all ActionBlock<Foo> together so that I can bundle them up with the originating Foo and pass it on as a wrapping object.

In Pseudo code it currently looks as follows:

BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2; 
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);

aBlock1 = new ActionBlock<Foo>(foo =>
{
    //do something here. Sometimes create a FooResult. If then
    targetBlock.SendAsync(fooResult);
});

//similar for aBlock2

However, the problem with the current code is that the targetBlock potentially does not receive anything if a Foo did not produce a single FooResult in any of the action blocks. Also, it could be that targetBlock receives 2 FooResult objects because each action block produced a FooResult.

What I want is that the targetBlock receives a wrapping object that contains each Foo and if FooResult objects were created then also a collection of FooResult.

Any ideas what I could do to make the solution work in the way described? It does not have to peruse TPL Dataflow but it would be neat if it did.

UPDATE: The following is what I got through the implementation of JoinBlock as suggested by svick. I am not gonna use it (unless it can be tweaked performance wise), because it is extremely slow to run, I get to about 89000 items (and thats only int value types) per second.

public class Test
{
    private BroadcastBlock<int> broadCastBlock;
    private TransformBlock<int, int> transformBlock1;
    private TransformBlock<int, int> transformBlock2;
    private JoinBlock<int, int, int> joinBlock;
    private ActionBlock<Tuple<int, int, int>> processorBlock;

    public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        joinBlock = new JoinBlock<int, int, int>();

        processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
            {
                //Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });

        broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });

        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}

Update of the code to reflect suggestions:

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        transformBlock2 = new TransformBlock<int, int>(i =>
            {
                return i;
            });

        joinBlock = new JoinBlock<int, int>();

        processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
            {
                //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(joinBlock.Target1);
        transformBlock2.LinkTo(joinBlock.Target2);
        joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        const int numElements = 1000000;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.Post(i);
        }

        ////mark completion
        broadCastBlock.Complete();
        Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


        processorBlock.Completion.Wait();

        watch.Stop();

        Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
        Console.ReadLine();
    }
}
like image 207
Matt Avatar asked Nov 21 '12 16:11

Matt


1 Answers

I can see two ways to solve this:

  1. Use JoinBlock. Your broadcast block, and both worker blocks will each send to one target of the join block. If a worker block doesn't have any results, it will give it null instead (or some other special value). Your worker blocks will need to change to TranformBlock<Foo, FooResult>, because using ActionBlock the way you do doesn't guarantee ordering (at least not when you set MaxDegreeOfParallelism), TransformBlock does.

    The result of the JoinBlock would be a Tuple<Foo, FooResult, FooResult>, where any or both of the FooResults can be null.

    Although I'm not sure I like that this solution relies heavily on correct ordering of items, that seems fragile to me.

  2. Use some other object for synchronization. That object will be then responsible for sending the result forward, when all blocks are done with a certain item. This is similar to the NotificationWrapper suggested by Mario in his answer.

    You could use TaskCompletionSource and Task.WhenAll() to take care of synchronization in this case.

like image 136
svick Avatar answered Sep 21 '22 09:09

svick