I've written a sample test that replicates the issue. This is not my actual code, I've tried to write a small repro. If you increase the bounding capacity to the number of iterations effectively giving it no bounding it does not deadlock and if you put the max parallelism to a small number like 1 it does not deadlock.
Again, I know the code below is not great but the code I actually found this in was much larger and hard to understand. Basically there was a blocking object pool of connections to a remote resource and several of the blocks in the flow used the connection.
Any ideas on how to solve this? At first glance it appears to be a problem with dataflow. When I break to take a look at the threads I see many threads blocked on Add and 0 threads blocked on take. There are several items in the addBlocks outbound queue that have not yet propagated to the takeblock so it's stuck or deadlocked.
var blockingCollection = new BlockingCollection<int>(10000);
var takeBlock = new ActionBlock<int>((i) =>
{
int j = blockingCollection.Take();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>((i) =>
{
blockingCollection.Add(i);
return i;
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 20
});
addBlock.LinkTo(takeBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
for (int i = 0; i < 100000; i++)
{
addBlock.Post(i);
}
addBlock.Complete();
await addBlock.Completion;
await takeBlock.Completion;
TPL Dataflow wasn't meant to be used with code that is blocking a lot, and I think this issue stems from that.
I couldn't figure out what exactly is going on, but I think a solution would be to use a non-blocking collection. Conveniently, Dataflow provides you with one in the form of BufferBlock
. With that, your code would look like this:
var bufferBlock = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10000 });
var takeBlock = new ActionBlock<int>(
async i =>
{
int j = await bufferBlock.ReceiveAsync();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20,
SingleProducerConstrained = true
});
var addBlock = new TransformBlock<int, int>(
async i =>
{
await bufferBlock.SendAsync(i);
return i;
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20
});
Although I find the whole design of your code suspicious. If you want to send some additional data along with the normal result of a block, change the type of the output of that block to a type that includes that additional data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With