Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentQueue that allows me to wait on one producer

I've a problem of Producer/Consumer. Currently I've a simple Queue surrounded by a lock.

I'm trying to replace it with something more efficient.

My first choice was to use a ConcurrentQueue, but I don't see how to make my consumer wait on the next produced message(without doing Thread.Sleep).

Also, I would like to be able to clear the whole queue if its size reach a specific number.

Can you suggest some existing class or implementation that would match my requirements?

like image 370
J4N Avatar asked Dec 18 '15 10:12

J4N


People also ask

What is a concurrent queue?

A concurrent queue is basically a queue which provides protection against multiple threads mutating its state and thus causing inconsistencies. A naive way to implement a concurrent queue may be to just slap locks in its enqueue and dequeue functions when they try to modify the head and tail.

Is concurrent queue FIFO?

Class ConcurrentLinkedQueue<E> An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out).

Is concurrent queue thread-safe?

ConcurrentQueue is a thread-safe FIFO data structure. It's a specialized data structure and can be used in cases when we want to process data in a First In First Out manner.


1 Answers

Here is an example on how you can use the BlockingCollection class to do what you want:

BlockingCollection<int> blocking_collection = new BlockingCollection<int>();

//Create producer on a thread-pool thread
Task.Run(() =>
{
    int number = 0;

    while (true)
    {
        blocking_collection.Add(number++);

        Thread.Sleep(100); //simulating that the producer produces ~10 items every second
    }
});

int max_size = 10; //Maximum items to have

int items_to_skip = 0;

//Consumer
foreach (var item in blocking_collection.GetConsumingEnumerable())
{
    if (items_to_skip > 0)
    {
        items_to_skip--; //quickly skip items (to meet the clearing requirement)
        continue;
    }

    //process item
    Console.WriteLine(item);

    Thread.Sleep(200); //simulating that the consumer can only process ~5 items per second

    var collection_size = blocking_collection.Count;

    if (collection_size > max_size) //If we reach maximum size, we flag that we want to skip items
    {
        items_to_skip = collection_size;
    }
}
like image 147
Yacoub Massad Avatar answered Sep 29 '22 17:09

Yacoub Massad