Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Deadlocks when using BlockingCollection<T> and TPL dataflow together

I've written a sample test that replicates the issue. This is not my actual code, I've tried to write a small repro. If you increase the bounding capacity to the number of iterations effectively giving it no bounding it does not deadlock and if you put the max parallelism to a small number like 1 it does not deadlock.

Again, I know the code below is not great but the code I actually found this in was much larger and hard to understand. Basically there was a blocking object pool of connections to a remote resource and several of the blocks in the flow used the connection.

Any ideas on how to solve this? At first glance it appears to be a problem with dataflow. When I break to take a look at the threads I see many threads blocked on Add and 0 threads blocked on take. There are several items in the addBlocks outbound queue that have not yet propagated to the takeblock so it's stuck or deadlocked.

    var blockingCollection = new BlockingCollection<int>(10000);

    var takeBlock = new ActionBlock<int>((i) =>
    {
        int j = blockingCollection.Take();

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20,
              SingleProducerConstrained = true
           });

    var addBlock = new TransformBlock<int, int>((i) => 
    {
        blockingCollection.Add(i);
        return i;

    }, new ExecutionDataflowBlockOptions()
           {
              MaxDegreeOfParallelism = 20
           });

    addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
          {
             PropagateCompletion = true
          });

    for (int i = 0; i < 100000; i++)
    {
        addBlock.Post(i);
    }

    addBlock.Complete();
    await addBlock.Completion;
    await takeBlock.Completion;
like image 957
Aaron Stainback Avatar asked Oct 20 '22 07:10

Aaron Stainback


1 Answers

TPL Dataflow wasn't meant to be used with code that is blocking a lot, and I think this issue stems from that.

I couldn't figure out what exactly is going on, but I think a solution would be to use a non-blocking collection. Conveniently, Dataflow provides you with one in the form of BufferBlock. With that, your code would look like this:

var bufferBlock = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10000 });

var takeBlock = new ActionBlock<int>(
    async i =>
    {
        int j = await bufferBlock.ReceiveAsync();
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20,
        SingleProducerConstrained = true
    });

var addBlock = new TransformBlock<int, int>(
    async i =>
    {
        await bufferBlock.SendAsync(i);
        return i;
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 20
    });

Although I find the whole design of your code suspicious. If you want to send some additional data along with the normal result of a block, change the type of the output of that block to a type that includes that additional data.

like image 182
svick Avatar answered Oct 27 '22 08:10

svick