I adopted my implementation of parallel/consumer based on the code in this question
class ParallelConsumer<T> : IDisposable
{
private readonly int _maxParallel;
private readonly Action<T> _action;
private readonly TaskFactory _factory = new TaskFactory();
private CancellationTokenSource _tokenSource;
private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
private Task _task;
public ParallelConsumer(int maxParallel, Action<T> action)
{
_maxParallel = maxParallel;
_action = action;
}
public void Start()
{
try
{
_tokenSource = new CancellationTokenSource();
_task = _factory.StartNew(
() =>
{
Parallel.ForEach(
_entries.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
(item, loopState) =>
{
Log("Taking" + item);
if (!_tokenSource.IsCancellationRequested)
{
_action(item);
Log("Finished" + item);
}
else
{
Log("Not Taking" + item);
_entries.CompleteAdding();
loopState.Stop();
}
});
},
_tokenSource.Token);
}
catch (OperationCanceledException oce)
{
System.Diagnostics.Debug.WriteLine(oce);
}
}
private void Log(string message)
{
Console.WriteLine(message);
}
public void Stop()
{
Dispose();
}
public void Enqueue(T entry)
{
Log("Enqueuing" + entry);
_entries.Add(entry);
}
public void Dispose()
{
if (_task == null)
{
return;
}
_tokenSource.Cancel();
while (!_task.IsCanceled)
{
}
_task.Dispose();
_tokenSource.Dispose();
_task = null;
}
}
And here is a test code
class Program
{
static void Main(string[] args)
{
TestRepeatedEnqueue(100, 1);
}
private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
{
bool[] flags = new bool[itemCount];
var consumer = new ParallelConsumer<int>(parallelCount,
(i) =>
{
flags[i] = true;
}
);
consumer.Start();
for (int i = 0; i < itemCount; i++)
{
consumer.Enqueue(i);
}
Thread.Sleep(1000);
Debug.Assert(flags.All(b => b == true));
}
}
The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?
You cannot use Parallel.Foreach()
with BlockingCollection.GetConsumingEnumerable()
, as you have discovered.
For an explanation, see this blog post:
https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/
Excerpt from the blog:
BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.
As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.
[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.
While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.
That blog also provides the source code for a method called GetConsumingPartitioner()
which you can use to solve the problem.
public static class BlockingCollectionExtensions
{
public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
{
return new BlockingCollectionPartitioner<T>(collection);
}
public class BlockingCollectionPartitioner<T> : Partitioner<T>
{
private BlockingCollection<T> _collection;
internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
{
if (collection == null)
throw new ArgumentNullException("collection");
_collection = collection;
}
public override bool SupportsDynamicPartitions
{
get { return true; }
}
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
if (partitionCount < 1)
throw new ArgumentOutOfRangeException("partitionCount");
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
}
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
}
The reason for failure is because of the following reason as explained here
The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.
To get it to work, you can add a method on your ParallelConsumer<T>
class to indicate that the adding is completed, as below
public void StopAdding()
{
_entries.CompleteAdding();
}
And now call this method after your for loop
, as below
consumer.Start();
for (int i = 0; i < itemCount; i++)
{
consumer.Enqueue(i);
}
consumer.StopAdding();
Otherwise, Parallel.ForEach()
would wait for the threshold to be reached so as to grab the chunk and start processing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With