Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

is it good to use BlockingCollection<T> as single-producer, single-consumer FIFO query?

I need single-producer, single-consumer FIFO query because

  • I need to process messages in the order they received.
  • I need to do this asynchronous because caller should not wait while I'm processing message.
  • Next message processing should be started only when previous message processing is finished. Sometimes frequency of "receiving" messages is higher than frequency of "processing" messages. But in average I should be able to process all messages, just sometimes I have to "queue" pack of them.

So it's pretty like TCP/IP I think, where you have one producer and one consumer, SOMETIMES you can receive messages faster than you can process, so you have to query them. Where order IS important and where caller absolutely not interested what you doing with that stuff.

This sounds easy enough and I likely can use general Queue for that, but I want to use BlockingCollection for that because I don't want to write any code with ManualResetEvent etc.

How suitable BlockingCollection for my task and probably you can suggest something else?

like image 679
Oleg Vazhnev Avatar asked Apr 11 '12 09:04

Oleg Vazhnev


People also ask

Is BlockingCollection thread-safe?

BlockingCollection<T> is a thread-safe collection class that provides the following features: An implementation of the Producer-Consumer pattern.

Is BlockingCollection ordered?

So yes, by default, it will be "guaranteed to be strictly FIFO if used in a single-producer, single-consumer fashion", at least in the current implementation.

What is Blocking queue in c#?

If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.


1 Answers

BlockingCollection class implements IProducerConsumerCollection interface so perfectly fits your requirements.

You can create two Tasks, one for async producer and an other one as consumer worker. Former would add items to BlockingCollection and the latter just consume as soon as new are available in FIFO order.

Producer-consumer sample application using TPL Tasks and BlockingCollection:

class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        Start();
    }

    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());

        Task.WaitAll(producerWorker, consumerWorker);
    }

    private static void RunProducer()
    {
        int itemsCount = 100;

        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }

    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}

IProducerConsumerCollection:

Defines methods to manipulate thread-safe collections intended for producer/consumer usage. This interface provides a unified representation for producer/consumer collections so that higher level abstractions such as System.Collections.Concurrent.BlockingCollection(Of T) can use the collection as the underlying storage mechanism.

like image 182
sll Avatar answered Sep 23 '22 19:09

sll