Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create never ending DataFlow Mesh with exception handling?

I am creating a Task processor which uses TPL DataFlow. I will follow a producer consumer model where in Producer produces some items to be processed once in a while and consumers keep waiting for new items to arrive. Here is my code:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for (int i = 0; i < count; i++)
        {
            arr[i] = randNum.Next(10, 20);
        }

        return arr;
    }
}

public class ActionMeshProcessor
{
    private TransformBlock<int, int> Transformer { get; set; }
    private ActionBlock<int> CompletionAnnouncer { get; set; }

    public async Task Init(CancellationToken cancellationToken)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int>(async input => {

            await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

            if (input > 15)
            {
                throw new Exception($"I can't handle this number: {input}");
            }

            return input + 1;
        }, options);

        this.CompletionAnnouncer = new ActionBlock<int>(async input =>
        {
            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        this.Transformer.LinkTo(this.CompletionAnnouncer);

        await Task.FromResult(0); // what do I await here?
    }

    public async Task ProcessBlockAsync(int[] arr)
    {
        foreach (var item in arr)
        {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }       
    }
}

I added a condition check above to throw an exception to mimic an exceptional case.

Here are my questions:

  • What is the best way I can handle exceptions in the above mesh without bringing the whole mesh down?

  • Is there a better way to initialize/start/continue a never ending DataFlow mesh?

  • Where do I await Completion?

I have looked in to this similar question

like image 229
Amit Avatar asked Apr 24 '17 22:04

Amit


1 Answers

Exceptions

There's nothing asynchronous in your init it could be a standard synchronous constructor. You can handle exceptions in your mesh without taking the mesh down with a simple try catch in the lamda you provide to the block. You can then handle that case by either filtering the result from your mesh or ignoring the result in the following blocks. Below is an example of filtering. For the simple case of an int you can use an int? and filter out any value that was null or of course you could set any type of magic indicator value if you like. If your actually passing around a reference type you can either push out null or mark the data item as dirty in way that can be examined by the predicate on your link.

public class ActionMeshProcessor {
    private TransformBlock<int, int?> Transformer { get; set; }
    private ActionBlock<int?> CompletionAnnouncer { get; set; }

    public ActionMeshProcessor(CancellationToken cancellationToken) {
        var options = new ExecutionDataflowBlockOptions {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int?>(async input => {
            try {
                await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

                if (input > 15) {
                    throw new Exception($"I can't handle this number: {input}");
                }

                return input + 1;
            } catch (Exception ex) {
                return null;
            }

        }, options);

        this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
        {
            if (input == null) throw new ArgumentNullException("input");

            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        //Filtering
        this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
        this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
    }

    public async Task ProcessBlockAsync(int[] arr) {
        foreach (var item in arr) {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }
    }
}

Completion

You can expose Complete() and Completion from your processor and use those to await the completion when your app shutsdown, assuming thats the only time you'd shutdown the mesh. Also, make sure you propagate completion through your links properly.

    //Filtering
    this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
    this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}        

public void Complete() {
    Transformer.Complete();
}

public Task Completion {
    get { return CompletionAnnouncer.Completion; }
}

Then, based on your sample the most likely place for completion is outside the loop driving your processing:

public async Task ExecuteAsync(CancellationToken cancellationToken) {
    var random = new Random();

    ActionMeshProcessor processor = new ActionMeshProcessor();
    await processor.Init(cancellationToken);

    while (!cancellationToken.IsCancellationRequested) {
        await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

        int[] items = GetItems(random.Next(3, 7));

        await processor.ProcessBlockAsync(items);
    }
    //asuming you don't intend to throw from cancellation
    processor.Complete();
    await processor.Completion();

}
like image 136
JSteward Avatar answered Sep 26 '22 10:09

JSteward