Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel.ForEach stalled when integrated with BlockingCollection

I adopted my implementation of parallel/consumer based on the code in this question

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

And here is a test code

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?

like image 992
user69715 Avatar asked Jul 24 '13 08:07

user69715


2 Answers

You cannot use Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable(), as you have discovered.

For an explanation, see this blog post:

https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/

Excerpt from the blog:

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

That blog also provides the source code for a method called GetConsumingPartitioner() which you can use to solve the problem.

public static class BlockingCollectionExtensions
{

    public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }


    public class BlockingCollectionPartitioner<T> : Partitioner<T>
    {
        private BlockingCollection<T> _collection;

        internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
        {
            if (collection == null)
                throw new ArgumentNullException("collection");

            _collection = collection;
        }

        public override bool SupportsDynamicPartitions
        {
            get { return true; }
        }

        public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
        {
            if (partitionCount < 1)
                throw new ArgumentOutOfRangeException("partitionCount");

            var dynamicPartitioner = GetDynamicPartitions();
            return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
        }

        public override IEnumerable<T> GetDynamicPartitions()
        {
            return _collection.GetConsumingEnumerable();
        }

    }
}
like image 80
Matthew Watson Avatar answered Sep 19 '22 02:09

Matthew Watson


The reason for failure is because of the following reason as explained here

The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

To get it to work, you can add a method on your ParallelConsumer<T> class to indicate that the adding is completed, as below

    public void StopAdding()
    {
        _entries.CompleteAdding();
    }

And now call this method after your for loop , as below

        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        consumer.StopAdding();

Otherwise, Parallel.ForEach() would wait for the threshold to be reached so as to grab the chunk and start processing.

like image 25
Prash Avatar answered Sep 18 '22 02:09

Prash