While working on an answer to this question, I wrote this snippet:
var buffer = new BufferBlock<object>(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); buffer.Post(null); Console.WriteLine("Post " + buffer.Count); } }); var consumer = Task.Run(async () => { while (await buffer.OutputAvailableAsync()) { IList<object> items; buffer.TryReceiveAll(out items); Console.WriteLine("TryReceiveAll " + buffer.Count); } }); await Task.WhenAll(consumer, producer);
The producer should post items to the buffer every 100 ms and the consumer should clear all items out of the buffer and asynchronously wait for more items to show up.
What actually happens is that the producer clears all items once, and then never again moves beyond OutputAvailableAsync
. If I switch the consumer to remove items one by one it works as excepted:
while (await buffer.OutputAvailableAsync()) { object item; while (buffer.TryReceive(out item)) ; }
Am I misunderstanding something? If not, what is the problem?
This is a bug in SourceCore
being used internally by BufferBlock
. Its TryReceiveAll
method doesn't turn on the _enableOffering
boolean data member while TryReceive
does. That results in the task returned from OutputAvailableAsync
never completing.
Here's a minimal reproduce:
var buffer = new BufferBlock<object>(); buffer.Post(null); IList<object> items; buffer.TryReceiveAll(out items); var outputAvailableAsync = buffer.OutputAvailableAsync(); buffer.Post(null); await outputAvailableAsync; // Never completes
I've just fixed it in the .Net core repository with this pull request. Hopefully the fix finds itself in the nuget package soon.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With