Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Analogue of Queue.Peek() for BlockingCollection when listening to consuming IEnumerable<T>

I'm using Pipelines pattern implementation to decouple messages consumer from a producer to avoid slow-consumer issue.

In case of any exception on a message processing stage [1] it will be lost and not dispatched to an other service/layer [2]. How can I handle such issue in [3] so message will not be lost and what is important! order of messages will not be mixed up so upper service/layer will get messages in the order they came in. I have an idea which involves an other intermediate Queue but it seems complex? Unfortunately BlockingCollection<T> does not expose any analogue of Queue.Peek() method so I can just read next available message and in case of successfull processing do Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            
    
        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

EDIT: Forgot to say this is IIS hosted WCF service which dispatches messages back to Silverlight client WCF Proxy via client callback contract.

EDIT2: Below is how I would do this using Peek(), Am I missing something?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3: Surely I can use old style approach using while loop, bool flag, Queue and AutoResetEvent, but I just wondering whether the same is possible using BlockingCollection and GetConsumingEnumerable() I think facility like Peek would be very helpful when using together with consuming enumerable, since otherwise all Pipeline pattern implementation examples new stuff like BlockingCollection and GetConsumingEnumerable() looks not durable and I have to move back to the old approach.

like image 973
sll Avatar asked Nov 27 '12 10:11

sll


People also ask

What is BlockingCollection in c#?

BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern. Concurrent adding and taking of items from multiple threads. Optional maximum capacity. Insertion and removal operations that block when collection is empty or full.

Is BlockingCollection thread-safe?

Yes, BlockingCollection<T> is thread safe. Show activity on this post. The BlockingCollection class is thread safe, in the sense that its internal state is protected from corruption when called concurrently by multiple threads.

Is Blockcollection a FIFO?

Perhaps MSDN documentation has been updated since this question but it now clearly states that BlockingCollection will default to FIFO unless otherwise directed.


3 Answers

BlockingCollection<T> is a wrapper around IProducerConsumerCollection<T>, which is more generic than e.g. ConcurrentQueue and gives the implementer the freedom of not having to implement a (Try)Peek method.

However, you can always call TryPeek on the underlying queue directly:

ConcurrentQueue<T> useOnlyForPeeking = new ConcurrentQueue<T>();
BlockingCollection<T> blockingCollection = new BlockingCollection<T>(useOnlyForPeeking);
...
useOnlyForPeeking.TryPeek(...)

Note however that you must not modify your queue via useOnlyForPeeking, otherwise blockingCollection will get confused and may throw InvalidOperationExceptions at you, but I'd be surprised if calling the non-modifying TryPeek on this concurrent data structure would be an issue.

like image 189
Evgeniy Berezovsky Avatar answered Nov 07 '22 14:11

Evgeniy Berezovsky


You should consider intermediate queue.

BlockingCollection<T> can't "peek" items because of its nature - there can be more than one consumer. One of them can peek an item, and another one can take it - hence, the first one will try to take item, that already has been taken.

like image 41
Dennis Avatar answered Nov 07 '22 14:11

Dennis


As Dennis says in his comment, BlockingCollection<T> provides a blocking wrapper to any implementor of the IProducerConsumerCollection<T> interface.

As you can see, IProducerConsumerCollection<T>, by design, does not define a Peek<T> or other methods necessary to implement one. This means that BlockingCollection<T> cannot, as it stands, offer an analouge to Peek.

If you consider, this greately reduces the concurrencey problems created by the utility trade off of a Peek implementation. How can you consume without consuming? To Peek concurrently you would have to lock the head of the collection until the Peek operation was completed which I and the designers of BlockingCollection<T> view as sub-optimal. I think it would also be messy and difficult to implement, requiring some sort of disposable peek context.

If you consume a message and its consumption fails you will have to handle with it. You could add it to another failures queue, re-add it to the normal processing queue for a furture retry or just log its failure for posterity or, some other action appropriate to your context.

If you don't want to consume the messages concurrently then there is no need to use BlockingCollection<T> since you don't need concurrent consumption. You could use ConcurrentQueue<T> directly, you'll still get synchronicity off adds, and you can use TryPeek<T> safely since you control a single consumer. If consumption fails you could stop consumption with a infinite retry loop in you desire although, I suggest this requires some design thought.

like image 34
Jodrell Avatar answered Nov 07 '22 15:11

Jodrell