I have a quantifiable & repeatable problem using the Task Parallel Library, BlockingCollection<T>
, ConcurrentQueue<T>
& GetConsumingEnumerable
while trying to create a simple pipeline.
In a nutshell, adding entries to a default BlockingCollection<T>
(which under the hood is relying on a ConcurrentQueue<T>
) from one thread, does not guarantee that they will be popped off the BlockingCollection<T>
from another thread calling the GetConsumingEnumerable()
Method.
I've created a very simple Winforms Application to reproduce/simulate this which just prints integers to the screen.
Timer1
is responsible for queueing up the work items... It uses a concurrent dictionary called _tracker
so that it knows what it has already added to the blocking collection.Timer2
is just logging the count state of both the BlockingCollection
& of the _tracker
Paralell.ForEach
which simply iterates over the blocking collections GetConsumingEnumerable()
and starts printing them to the second list box.Timer1
preventing more entries from being added to the blocking collection.public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
Here's the sequence of events:
You can see that the concurrent dictionary is still tracking that 1 item has not yet been processed & subsequently removed from _tracker
If I Press Start again, then timer1 begins adding more 3 more entries and the Parallel loop comes back to life printing 5, 6, 7 & 8.
I'm at a complete loss as to why this occurs. Calling start again obviously calls a newtask, which calls a Paralell foreach, and re-executes GetConsumingEnumerable() which magically finds the missing entry... I
Why is the BlockingCollection.GetConsumingEnumerable()
not guaranteeing to iterate over every item that's added to the collection.
Why does the addition of more entries subsequently cause it to get "unstuck" and continue with it's processing?
You can't use GetConsumingEnumerable()
in Parallel.ForEach()
.
Use the GetConsumingPartitioner
from the TPL extras
In the blog post you will also get an explanation why can't use GetConsumingEnumerable()
The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.
i.e. Parallel.ForEach wait until it receives a group of work items before continuing. Exactly what your experiment shows.
As of .net 4.5, you can create a partitioner which will take only 1 item at a time:
var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff}
https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With