I have a producer that produces integers by burst (1 to 50 in a few seconds). I have a consumer that consumes those integers by block.
I want the consumer to start consuming when the producer has finished his burst (I don't have the lead on the producer, I would just know that it has finished producing when there is nothing produced for 5 seconds).
I thought about thoses 2 differents way :
First : using kind of one consumer notfying the other :
private readonly List<int> _ids = new List<int>();
private readonly ManualResetEvent _mainWaiter = new ManualResetEvent(false);
private readonly ManualResetEvent _secondaryWaiter = new ManualResetEvent(false);
//This methods consumes the id from the producer
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
_mainWaiter.Set();
_secondaryWaiter.Set();
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock()
{
while(true)
{
_mainWaiter.Wait();
while(_secondaryWaiter.Wait(5000));
List<int> localIds;
lock(_ids)
{
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
}
Second : have a kind of token for the last update
//This methods consumes the id from the producer
private int _lastToken;
public void OnConsumeId(int newId)
{
lock(_ids)
{
_ids.Add(newId);
ThreadPool.Queue(()=>ConsumerIdByBlock(++_lastToken));
}
}
//This methods runs on the dedicated thread :
public void ConsumerIdByBlock(int myToken)
{
Thread.Sleep(5000);
List<int> localIds;
lock(_ids)
{
if(myToken !=_lastToken)
return;
localIds = new List<int>(_ids);
_ids.Clear();
}
//Do the job with localIds
}
But I find these approaches a bit too complicated for doing this. Does a native/simpler solution exists ? How would you do ?
The producer consumer pattern is a concurrency design pattern where one or more producer threads produce objects which are queued up, and then consumed by one or more consumer threads. The objects enqueued often represent some work that needs to be done.
Producer-Consumer problem is a classical synchronization problem in the operating system. With the presence of more than one process and limited resources in the system the synchronization problem arises. If one resource is shared between more than one process at the same time then it can lead to data inconsistency.
One example of a common producer/consumer relationship is print spooling. Although a printer might not be available when you want to print from an application (i.e., the producer), you can still “complete” the print task, as the data is temporarily placed on disk until the printer becomes available.
In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.
This becomes a lot easier if you use a thread-safe queue that already has notification and such. The BlockingCollection makes writing producer-consumer stuff really easy.
I like your "linked consumer" idea because you don't have to modify the producer in order to use it. That is, the producer just stuffs things in a queue. How the consumer ultimately uses it is irrelevant. Using BlockingCollection
, then, you'd have:
BlockingCollection<ItemType> inputQueue = new BlockingCollection<ItemType>();
BlockingCollection<List<ItemType>> intermediateQueue = new BlockingCollection<List<ItemType>>();
Your producer adds things to the input queue by calling inputQueue.Add
. Your intermediate consumer (call it the consolidator) gets things from the queue by calling TryTake with a timeout. For example:
List<ItemType> items = new List<ItemType>();
while (!inputQueue.IsCompleted)
{
ItemType t;
while (inputQueue.TryTake(out t, TimeSpan.FromSeconds(5))
{
items.Add(t);
}
if (items.Count > 0)
{
// Add this list of items to the intermediate queue
intermediateQueue.Add(items);
items = new List<ItemType>();
}
}
The second consumer just reads the intermediate queue:
foreach (var itemsList in intermediateQueue.GetConsumingEnumerable))
{
// do something with the items list
}
No need for ManualResetEvent
or lock
or any of that; BlockingCollection
handles all the messy concurrency stuff for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With