Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delayed Producer Consumer pattern

Tags:

c#

.net-3.5

I have a producer that produces integers by burst (1 to 50 in a few seconds). I have a consumer that consumes those integers by block.

I want the consumer to start consuming when the producer has finished his burst (I don't have the lead on the producer, I would just know that it has finished producing when there is nothing produced for 5 seconds).

I thought about thoses 2 differents way :

First : using kind of one consumer notfying the other :

private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);

//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        _mainWaiter.Set();
        _secondaryWaiter.Set();
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
    while(true)
    {
        _mainWaiter.Wait();
        while(_secondaryWaiter.Wait(5000));

        List<int> localIds;
        lock(_ids)
        {
            localIds = new List<int>(_ids);
            _ids.Clear();
        }
        //Do the job with localIds
    }
}

Second : have a kind of token for the last update

//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
    lock(_ids)
    {
        _ids.Add(newId);
        ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
    }
}

//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{       
    Thread.Sleep(5000);

    List<int> localIds;
    lock(_ids)
    {
        if(myToken !=_lastToken)
            return;     

        localIds = new List<int>(_ids);
        _ids.Clear();
    }

    //Do the job with localIds  
}

But I find these approaches a bit too complicated for doing this. Does a native/simpler solution exists ? How would you do ?

like image 216
Toto Avatar asked May 01 '14 14:05

Toto


People also ask

What is Producer consumer pattern?

The producer consumer pattern is a concurrency design pattern where one or more producer threads produce objects which are queued up, and then consumed by one or more consumer threads. The objects enqueued often represent some work that needs to be done.

What is producer consumer problem with example?

Producer-Consumer problem is a classical synchronization problem in the operating system. With the presence of more than one process and limited resources in the system the synchronization problem arises. If one resource is shared between more than one process at the same time then it can lead to data inconsistency.

What's an example of a producer and consumer relationship?

One example of a common producer/consumer relationship is print spooling. Although a printer might not be available when you want to print from an application (i.e., the producer), you can still “complete” the print task, as the data is temporarily placed on disk until the printer becomes available.

What is producer consumer problem in multithreading?

In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.


1 Answers

This becomes a lot easier if you use a thread-safe queue that already has notification and such. The BlockingCollection makes writing producer-consumer stuff really easy.

I like your "linked consumer" idea because you don't have to modify the producer in order to use it. That is, the producer just stuffs things in a queue. How the consumer ultimately uses it is irrelevant. Using BlockingCollection, then, you'd have:

BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();

Your producer adds things to the input queue by calling inputQueue.Add. Your intermediate consumer (call it the consolidator) gets things from the queue by calling TryTake with a timeout. For example:

List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
    ItemType t;
    while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
    {
        items.Add(t);
    }
    if (items.Count > 0)
    {
        // Add this list of items to the intermediate queue
        intermediateQueue.Add(items);
        items = new List<ItemType>();
    }
}

The second consumer just reads the intermediate queue:

foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
    // do something with the items list
}

No need for ManualResetEvent or lock or any of that; BlockingCollection handles all the messy concurrency stuff for you.

like image 98
Jim Mischel Avatar answered Oct 03 '22 19:10

Jim Mischel