Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL.Dataflow - preventing hangs when unhanded exception occurs in ActionBlock<T>

Just starting with System.Threading.Tasks.Dataflow and not sure I understand proper error handling technique for unhandled exceptions in the ActionBlock.

What I have right now leads to hang: - ActionBlock had unhandled exception and is no longer processing - producer is not able to complete because it's over BoundedCapacity

Here is the code that I have (it's simplified to show one consumer).

internal class Program
{
    private static int _processCounter = 0;

    internal class MyClass
    {
        public MyClass(int id)
        {
            this.Id = id;
        }

        internal int Id { get; set; }
    }

    private static void Main(string[] args)
    {
        BufferBlock<MyClass> queue = new BufferBlock<MyClass>(new DataflowBlockOptions {BoundedCapacity = 10,});

        ActionBlock<MyClass> consumer =
            new ActionBlock<MyClass>(record => Process(record),
                new ExecutionDataflowBlockOptions {BoundedCapacity = 1,});

        queue.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true,});

        Task producer = Produce(queue);

        Trace.TraceInformation("Starting to wait on producer and consumer...");

        Task.WhenAll(producer, consumer.Completion).Wait(); // <-- this will hang. consumer.Completion is faulted, but producer is still "running".

    }

    private static async Task Produce(BufferBlock<MyClass> queue)
    {
        for (int i = 0; i < 20; i++)
        {
            await queue.SendAsync(new MyClass(i));
            Trace.TraceInformation("Sending object number {0}", i);
            await Task.Delay(1);
        }
        Trace.TraceInformation("Completing the producer");
        queue.Complete();
            // <-- we never get here because one of the SendAsync will be waiting to not excede BoundedCapacity = 10
    }

    private static void Process(MyClass myClass)
    {
        int counter = Interlocked.Increment(ref _processCounter);
        Trace.TraceInformation("Processing object number {0}", myClass.Id);
        if (counter > 4)
        {
            Trace.TraceInformation("About to throw exception for object {0}", myClass.Id);
            throw new ArgumentException("Something bad happened");
        }
    }
}

Output:

ConsoleApplication5.vshost.exe Information: 0 : Sending object number 0
ConsoleApplication5.vshost.exe Information: 0 : Starting to wait on producer and consumer...
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 1
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 0
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 2
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 3
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 1
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 4
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 2
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 3
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 5
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 4
ConsoleApplication5.vshost.exe Information: 0 : About to throw exception for object 4
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 6
A first chance exception of type 'System.ArgumentException' occurred in ConsoleApplication5.exe
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 7
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 8
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 9
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 10
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 11
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 12
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 13
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 14
<never finishes>

The question is, what is the proper way to wait for such execution to ensure it either completes or propagates exception. Thanks!

like image 791
Alexey Yeltsov Avatar asked Dec 06 '25 12:12

Alexey Yeltsov


1 Answers

There's a lot you can do since it's about how you structure your code. The simplest is probably to use a CancellationToken for the producer and wait for the consumer first:

private static void Main(string[] args)
{
    // ...

    var cts = new CancellationTokenSource();
    Task producer = Produce(queue, cts.Token);

    Trace.TraceInformation("Starting to wait on producer and consumer...");
    try
    {
        await consumer.Completion;
    }
    catch
    {
        cts.Cancel();
        // handle
    }

    try
    {
        await producer
    }
    catch
    {
        // handle
    }
}

private static async Task Produce(BufferBlock<MyClass> queue, CancellationToken token)
{
    for (int i = 0; i < 20; i++)
    {
        await queue.SendAsync(new MyClass(i), token);
        Trace.TraceInformation("Sending object number {0}", i);
        await Task.Delay(1);
    }
    Trace.TraceInformation("Completing the producer");
    queue.Complete();
}
like image 191
i3arnon Avatar answered Dec 08 '25 00:12

i3arnon



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!