Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best scenario for one fast producer multiple slow consumers?

I'm looking for the best scenario to implement one producer multiple consumer multithreaded application. Currently I'm using one queue for shared buffer but it's much slower than the case of one producer one consumer. I'm planning to do it like this:

Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
    int curIndex = 0;
    while(true)
    {
        // Produce item;
        lock(_locks[curIndex])
        {
            buffs[curIndex].Enqueue(curItem);
            Monitor.Pulse(_locks[curIndex]);
        }
        curIndex = (curIndex+1)%N;
    }
}

static void Consume(int myIndex)
{
    item curItem;
    while(true)
    {
        lock(_locks[myIndex])
        {
            while(buffs[myIndex].Count == 0)
                Monitor.Wait(_locks[myIndex]);
            curItem = buffs[myIndex].Dequeue();
        }
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}
like image 410
Mehraban Avatar asked Mar 23 '23 05:03

Mehraban


2 Answers

Use a BlockingCollection

BlockingCollection<item> _buffer = new BlockingCollection<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        _buffer.Add(curItem);
    }

    // eventually stop producing
    _buffer.CompleteAdding();
}

static void Consume(int myIndex)
{
    foreach (var curItem in _buffer.GetConsumingEnumerable())
    {
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}

If you don't want to specify number of threads from start you can use Parallel.ForEach instead.

static void Consume(item curItem)
{
    // consume item
}

void Main()
{
    Thread producer = new Thread(Produce);
    producer.Start();

    Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}
like image 192
adrianm Avatar answered Apr 06 '23 09:04

adrianm


Using more threads won't help. It may even reduce performance. I suggest you try to use ThreadPool where every work item is one item created by the producer. However, that doesn't guarantee the produced items to be consumed in the order they were produced.


Another way could be to reduce the number of consumers to 4, for example and modify the way they work as follows:

The producer adds the new work to the queue. There's only one global queue for all worker threads. It then sets a flag to indicate there is new work like this:

ManualResetEvent workPresent = new ManualResetEvent(false);
Queue<item> workQueue = new Queue<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        lock(workQueue)
        {
            workQueue.Enqueue(newItem);
            workPresent.Set();
        }
    }
}

The consumers wait for work to be added to the queue. Only one consumer will get to do its job. It then takes all the work from the queue and resets the flag. The producer will not be able to add new work until that is done.

static void Consume()
{
    while(true)
    {
        if (WaitHandle.WaitOne(workPresent))
        {
            workPresent.Reset();

            Queue<item> localWorkQueue = new Queue<item>();
            lock(workQueue)
            {
                while (workQueue.Count > 0)
                    localWorkQueue.Enqueue(workQueue.Dequeue());
            }

            // Handle items in local work queue
            ...
        }
    }
}    

That outcome of this, however, is a bit unpredictable. It could be that one thread is doing all the work and the others do nothing.

like image 42
Thorsten Dittmar Avatar answered Apr 06 '23 09:04

Thorsten Dittmar