Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing a blocking queue in C#

I've use the below code to implement and test a blocking queue. I test the queue by starting up 5 concurrent threads (the removers) to pull items off the queue, blocking if the queue is empty and 1 concurrent thread (the adder) to add items to the queue intermitently. However, if I leave it running for long enough I get an exception because one of the remover threads comes out of a waiting state even when the queue is empty.

Does anyone know why I get the exception? Note, I'm interested in knowing why this doesn't work as opposed to a working solution (as I can just Google that).

I'd greatly appreciate your help.

using System;
using System.Threading;
using System.Collections.Generic;

namespace Code
{
    class Queue<T>
    {
        private List<T> q = new List<T>();

        public void Add(T item)
        {
            lock (q)
            {
                q.Add(item);
                if (q.Count == 1)
                {
                    Monitor.Pulse(q);
                }
            }
        }

        public T Remove()
        {
            lock (q)
            {
                if (q.Count == 0)
                {
                    Monitor.Wait(q);
                }
                T item = q[q.Count - 1];
                q.RemoveAt(q.Count - 1);
                return item;
            }
        }
    }

    class Program
    {
        static Random r = new Random();
        static Queue<int> q = new Queue<int>();
        static int count = 1;
        static void Adder()
        {
            while (true)
            {
                Thread.Sleep(1000 * ((r.Next() % 5) + 1));
                Console.WriteLine("Will try to add");
                q.Add(count++);
            }
        }

        static void Remover()
        {
            while (true)
            {
                Thread.Sleep(1000 * ((r.Next() % 5) + 1));
                Console.WriteLine("Will try to remove");
                int item = q.Remove();
                Console.WriteLine("Removed " + item);
            }
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Test");

            for (int i = 0; i < 5; i++)
            {
                Thread remover = new Thread(Remover);
                remover.Start();
            }

            Thread adder = new Thread(Adder);
            adder.Start();
        }
    }
}
like image 331
user1229458 Avatar asked Feb 23 '12 22:02

user1229458


People also ask

How do you implement a blocking queue?

Put() Implementation in Blocking Queue This implementation is very similar to enQueue() method. Once the capacity is reached, the thread is blocked or else it's a simple enQueue operation using LinkedList. Once the element is queued, we notify in case other waiting threads are blocked due to an empty queue.

What is a blocking queue?

BlockingQueue is a java Queue that support operations that wait for the queue to become non-empty when retrieving and removing an element, and wait for space to become available in the queue when adding an element.

What is blocking queue in multithreading?

BlockingQueue , represents a queue which is thread safe to put elements into, and take elements out of from. In other words, multiple threads can be inserting and taking elements concurrently from a Java BlockingQueue , without any concurrency issues arising.

How will you implement your own thread safe blocking queue in Java?

We will use lock and conditions objects to create custom BlockingQueue. While putting the element in the queue, if the queue is full, then the producer will wait for queue to empty. While consuming element from the queue, if the queue is empty then the consumer will wait for the queue to get filled.


1 Answers

if I leave it running for long enough I get an exception because one of the remover threads comes out of a waiting state even when the queue is empty. Does anyone know why I get the exception?

The question is odd, because obviously you know the answer: your first sentence answers the question asked by the second sentence. You get the exception because a remover thread comes out of the wait state when the queue is empty.

To solve the problem you'll want to use a loop instead of an "if". The correct code is:

while(q.Count == 0) Monitor.Wait(q);

not

if(q.Count == 0) Monitor.Wait(q);

UPDATE:

A commenter points out that perhaps your question was intended to be "under what circumstances can a consumer thread obtain the monitor when the queue is empty?"

Well, you are in a better position to answer that than we are, since you're the one running the program and looking at the output. But just off the top of my head, here's a way that could happen:

  • Consumer Thread 1: waiting
  • Consumer Thread 2: ready
  • Producer Thread 3: owns the monitor
  • There is one element in the queue.
  • Thread 3 pulses.
  • Thread 1 goes to ready state.
  • Thread 3 abandons the monitor.
  • Thread 2 enters the monitor.
  • Thread 2 consumes the item in the queue
  • Thread 2 abandons the monitor.
  • Thread 1 enters the monitor.

And now thread 1 is in the monitor with an empty queue.

Generally speaking when reasoning about these sorts of problems you should think of "Pulse" as being like a pigeon with a note attached to it. Once released it has no connection to the sender, and if it cannot find its home, it dies in the wilderness with its message undelivered. All you know when you Pulse is that if there is any thread waiting then one thread will move to the ready state at some time in the future; you don't know anything else about the relative timing of operations on threads.

like image 190
Eric Lippert Avatar answered Sep 18 '22 17:09

Eric Lippert