I am confronted with the following problem:
I have a a data stream of Foo
objects and stream those objects to several concurrent in-process tasks/threads which in turn process the objects and output FooResult
objects. Each FooResult
contains among othermembers the same Foo
that was used in the creation of a FooResult
. However, not every Foo
necessarily creates a FooResult
.
My problem is that I want to pass on from this whole process a wrapping object that contains the original Foo
and potentially all, if any, FooResult
objects that may have been created from a Foo
within the concurrent tasks.
Note: I currently use TPL Dataflow, whereas each concurrent process happens within an ActionBlock<Foo>
which is linked to from a BroadCastBlock<Foo>
. It uses SendAsync()
to a target dataflow block to send potentially created FooResult
. Obviously the concurrent data flow blocks produce FooResult
at unpredictable times which is what I currently struggle with. I do not seem to be able to figure out how many FooResult
were created in all ActionBlock<Foo>
together so that I can bundle them up with the originating Foo
and pass it on as a wrapping object.
In Pseudo code it currently looks as follows:
BroadCastBlock<Foo> broadCastBlock;
ActionBlock<Foo> aBlock1;
ActionBlock<Foo> aBlock2;
ActionBlock<FooResult> targetBlock;
broadCastBlock.LinkTo(aBlock1); broadCastBlock.LinkTo(aBlock2);
aBlock1 = new ActionBlock<Foo>(foo =>
{
//do something here. Sometimes create a FooResult. If then
targetBlock.SendAsync(fooResult);
});
//similar for aBlock2
However, the problem with the current code is that the targetBlock potentially does not receive anything if a Foo
did not produce a single FooResult
in any of the action blocks. Also, it could be that targetBlock receives 2 FooResult
objects because each action block produced a FooResult
.
What I want is that the targetBlock receives a wrapping object that contains each Foo
and if FooResult
objects were created then also a collection of FooResult
.
Any ideas what I could do to make the solution work in the way described? It does not have to peruse TPL Dataflow but it would be neat if it did.
UPDATE: The following is what I got through the implementation of JoinBlock as suggested by svick. I am not gonna use it (unless it can be tweaked performance wise), because it is extremely slow to run, I get to about 89000 items (and thats only int value types) per second.
public class Test
{
private BroadcastBlock<int> broadCastBlock;
private TransformBlock<int, int> transformBlock1;
private TransformBlock<int, int> transformBlock2;
private JoinBlock<int, int, int> joinBlock;
private ActionBlock<Tuple<int, int, int>> processorBlock;
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
joinBlock = new JoinBlock<int, int, int>();
processorBlock = new ActionBlock<Tuple<int, int, int>>(tuple =>
{
//Console.WriteLine("original value: " + tuple.Item1 + "tfb1: " + tuple.Item2 + "tfb2: " + tuple.Item3);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(joinBlock.Target1, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(joinBlock.Target3, new DataflowLinkOptions { PropagateCompletion = true });
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
Update of the code to reflect suggestions:
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, int>(i =>
{
return i;
});
transformBlock2 = new TransformBlock<int, int>(i =>
{
return i;
});
joinBlock = new JoinBlock<int, int>();
processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
{
//Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(joinBlock.Target1);
transformBlock2.LinkTo(joinBlock.Target2);
joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
Stopwatch watch = new Stopwatch();
watch.Start();
const int numElements = 1000000;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.Post(i);
}
////mark completion
broadCastBlock.Complete();
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());
processorBlock.Completion.Wait();
watch.Stop();
Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
Console.ReadLine();
}
}
I can see two ways to solve this:
Use JoinBlock
. Your broadcast block, and both worker blocks will each send to one target of the join block. If a worker block doesn't have any results, it will give it null
instead (or some other special value). Your worker blocks will need to change to TranformBlock<Foo, FooResult>
, because using ActionBlock
the way you do doesn't guarantee ordering (at least not when you set MaxDegreeOfParallelism
), TransformBlock
does.
The result of the JoinBlock
would be a Tuple<Foo, FooResult, FooResult>
, where any or both of the FooResult
s can be null
.
Although I'm not sure I like that this solution relies heavily on correct ordering of items, that seems fragile to me.
Use some other object for synchronization. That object will be then responsible for sending the result forward, when all blocks are done with a certain item. This is similar to the NotificationWrapper
suggested by Mario in his answer.
You could use TaskCompletionSource
and Task.WhenAll()
to take care of synchronization in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With