Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Linking dynamically created ActionBlocks to a BufferBlock

I'm not sure if this is possible, but if it is, I'm probably not doing it right. Let's suppose I have one shared buffer that is linked to many consumers (ActionBlocks). Each consumer should consume data that satisfies a predicate used to link it to the buffer. For example, ActionBlock1 should consume numbers that satisfy x => x % 5 == 0, ActionBlock2 should consume only x => x % 5 == 1 etc.

Here's what I've got:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
    }

    return productionQueue;
}

And then I call:

Random rnd = new Random();

ITargetBlock<int> temp = BuildPipeline(5);

while (true)
{
    temp.Post(rnd.Next(255));
}

However this does not work. No output is displayed in the console. If i modify BuildPipeline method as:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
    ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
    ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
    ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
    ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));

    productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
    productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
    productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
    productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
    productionQueue.LinkTo(productionLine5, x => x % 5 == 4);

    return productionQueue;
}

the code does what it's expected to do.

Can someone shed light on why dynamically creating and linking action blocks doesn't work?

P.S. If I break into code right after ITargetBlock<int> temp = BuildPipeline(5); temp does show that 5 targets are linked to the buffer. And Id of each target is different.

Thanks in advance

EDIT: Added changes suggested by svick but still no good:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));

        int j = i;
        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
    productionQueue.LinkTo(discardedLine);

    return productionQueue;
}

Now only second production line processes data (the one satisfying x % 5 == 1 predicate). And the data does not satisfy the predicate, meaning I get numbers ending in 9 and 7.

EDIT: Working code would look something like the following:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };

    for (int i = 0; i < NumProductionLines; i++)
    {
        int j = i;
        ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));

        productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
    }

    productionQueue.LinkTo(DataflowBlock.NullTarget<int>());

    return productionQueue;
}
like image 324
Dimitri Avatar asked Sep 28 '12 20:09

Dimitri


1 Answers

The problem is that in your first version, you're using the same predicate for each target block. In other words, the predicate doesn't depend on i.

But even if it did, your code wouldn't work, because the i variable is shared among the predicates, so they will all use the last value. The fix for that is to copy i into a local variable and use that in the predicate.

The code could look like this:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
    var productionQueue = new BufferBlock<int>();

    for (int i = 0; i < NumProductionLines; i++)
    {
        int iCopy = i;

        ActionBlock<int> productionLine = new ActionBlock<int>(
            num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num));

        productionQueue.LinkTo(
            productionLine, x => x % NumProductionLines == iCopy);
    }

    return productionQueue;
}

If you're asking why doesn't your code process at least the x % 5 == 1 numbers, that's because the random generator will likely generate a number that doesn't match that predicate, so none of the ActionBlocks will accept it. Because of that, the number will stay in the output queue of the source block and other numbers won't be able to go through.

If, in your real code, similar situation might happen and you want to discard all numbers that don't fit any of the predicates, you can link the source block to a block that does nothing and accepts anything, after you link it to all your useful blocks:

productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
like image 62
svick Avatar answered Sep 22 '22 10:09

svick