Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use a blockingcollection in the Producer/Consumer pattern when the producers are also the consumers - How do I end?

I have a recursive problem where the consumers do some work at each level of a tree, then need to recurse down the tree and perform that same work at the next level.

I want to use ConcurrentBag/BlockingCollection etc to run this in parallel. In this scenario, the consumers of the queue, are also the producers for the queue!

My problem is this : Using BlockingCollection, I can write very simple foreach logic to dequeue items, and queue new ones - When the queue is empty, the blocking collection will block correctly, and wait for new work to be produced by one of the other consumers.

But how do I know if all of the consumers are blocking?!

I know about CompleteAdding(), but that does not seem to serve, as the only time you are actually complete is when all of the producers are done producing and the queue is empty - and since they would all be blocking, there is nobody "free" to set CompleteAdding(). Is there a way to detect this? (Perhaps an event that can fire when blocking, and fire again when unblocked?)

I can deal with this manually, by not using a foreach, but manually having a while(!complete) loop, and using TryTake, but then I need to manually sleep, which seems inefficent (the whole reason to have the blocking collection vs just the concurrent collections in the first place!) Each time through the loop, if TryTake is false, I could set an Idle flag, and then have a Master check if the queue is empty, and all threads are idle, set a complete flag, but again, this seems kludgy.

Intuition is telling me there is some way to use BlockingCollection to do this, but I cant quite get there.

Anyway, anyone have a good pattern for when the consumers are the producers and being able to detect when to release all the blocks would be awesome.

like image 255
Jason Coyne Avatar asked Jun 01 '12 14:06

Jason Coyne


People also ask

What is a BlockingCollection?

BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of items from multiple threads. Optional maximum capacity. Insertion and removal operations that block when collection is empty or full.

What is producer-consumer pattern?

The Producer-Consumer Design pattern is a classic concurrency or threading pattern which reduces coupling between Producer and Consumer by separating Identification of work with Execution of Work.

What is a producer-consumer queue?

The producer consumer pattern is a concurrency design pattern where one or more producer threads produce objects which are queued up, and then consumed by one or more consumer threads. The objects enqueued often represent some work that needs to be done.

Is Blockcollection a FIFO?

Perhaps MSDN documentation has been updated since this question but it now clearly states that BlockingCollection will default to FIFO unless otherwise directed.


1 Answers

Here is a low-level implementation of a collection similar to the BlockingCollection<T>, with the difference that it completes automatically instead of relying on manually calling the CompleteAdding method. The condition for the automatic completion is that the collection is empty, and all the consumers are in a waiting state.

/// <summary>
/// A blocking collection that completes automatically when it's empty and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private int _consumersCount = 0;
    private int _waitingConsumers = 0;
    private bool _autoCompleteStarted;
    private bool _completed;

    public int Count { get { lock (_queue) return _queue.Count; } }
    public bool IsCompleted => Volatile.Read(ref _completed);

    public void Add(T item)
    {
        lock (_queue)
        {
            if (_completed) throw new InvalidOperationException(
                "The collection has completed.");
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
    }

    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete()
    {
        lock (_queue)
        {
            if (_autoCompleteStarted) return;
            _autoCompleteStarted = true;
            Monitor.PulseAll(_queue);
        }
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        bool waiting = false;
        lock (_queue) _consumersCount++;
        try
        {
            while (true)
            {
                T item;
                lock (_queue)
                {
                    if (_completed) yield break;
                    while (_queue.Count == 0)
                    {
                        if (_autoCompleteStarted &&
                            _waitingConsumers == _consumersCount - 1)
                        {
                            _completed = true;
                            Monitor.PulseAll(_queue);
                            yield break;
                        }
                        waiting = true; _waitingConsumers++;
                        Monitor.Wait(_queue);
                        waiting = false; _waitingConsumers--;
                        if (_completed) yield break;
                    }
                    item = _queue.Dequeue();
                }
                yield return item;
            }
        }
        finally
        {
            lock (_queue)
            {
                _consumersCount--;
                if (waiting) _waitingConsumers--;
                if (!_completed && _autoCompleteStarted &&
                    _waitingConsumers == _consumersCount)
                {
                    _completed = true;
                    Monitor.PulseAll(_queue);
                }
            }
        }
    }
}

The AutoCompleteBlockingCollection<T> offers only the most basic functionality of the BlockingCollection<T> class. Features like bounded capacity and cancellation are not supported.

Usage example:

var queue = new AutoCompleteBlockingCollection<Node>();
queue.Add(rootNode);
queue.BeginObservingAutoComplete();
Task[] workers = Enumerable.Range(1, 4).Select(_ => Task.Run(() =>
{
    foreach (Node node in queue.GetConsumingEnumerable())
    {
        Process(node);
        foreach (Node child in node.Children)
            queue.Add(child);
    }
})).ToArray();
await Task.WhenAll(workers);

The BeginObservingAutoComplete method should be called after adding the initial items in the collection. Before calling this method, the auto-complete condition is not checked. In the above example only one item is added before starting to observe the auto-complete condition. Then four workers are launched, with each worker consuming the collection, processing each consumed node, and then adding the children of this node in the collection. Eventually all nodes of the tree will be consumed, and the last active worker will trigger the automatic completion of the collection. This will allow all workers to exit the consuming loop and complete.

Adding and removing consumers at any time (dynamically) is supported. The collection is thread-safe.

A feature-rich but less efficient implementation of the above collection can be found here.

like image 122
Theodor Zoulias Avatar answered Sep 21 '22 11:09

Theodor Zoulias