Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Alternate to Dataflow BroadcastBlock with guaranteed delivery

I need to have some kind of object that acts like a BroadcastBlock, but with guaranteed delivery. So i used an answer from this question. But i don't really clearly understand the execution flow here. I have a console app. Here is my code:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();

    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));

    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);

    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });

    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);

    Task.WaitAll(ToWait.ToArray());

    Console.ReadLine();
}

static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);

    broadcaster.Complete();
}

Each number must be handled sequentially, so i can't use MaxDegreeOfParallelism in broadcaster block. But all actionblocks that receive the number can run in parallel.

So here is the question:

In the output i can see different thread ids. Do i understand it correctly that works as follows:

Execution hits await block.SendAsync(num); in a broadcaster. If current block is not ready to accept the number, execution exits broadcaster and hangs at the Task.WaitAll. When block accepts the number, the rest of foreach statement in broadcaster is executed in a threadpool. And the same till the end. Each iteration of foreach is executed in a threadpool. But actually it happens sequentially.

Am i right or wrong in my understanding? How can i change this code to send the number to all blocks asynchronously?

To make sure that if one of blocks is not ready to receive the number at the moment, i won't wait for it and all others that are ready will receive the number. And that all blocks can run in parallel. And guarantee delivery.

like image 468
shda Avatar asked Aug 01 '14 14:08

shda


1 Answers

Assuming you want to handle one item at a time by the broadcaster while enabling the target blocks to receive that item concurrently you need to change the broadcaster to offer the number to all blocks at the same time and then asynchronously wait for all of them together to accept it before moving on to the next number:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

Now, in this case where you don't have work after the await you can slightly optimize while still returning an awaitable task:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);
like image 93
i3arnon Avatar answered Sep 28 '22 11:09

i3arnon