Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Threadsafe FIFO Queue/Buffer

I need to implement a sort of task buffer. Basic requirements are:

  • Process tasks in a single background thread
  • Receive tasks from multiple threads
  • Process ALL received tasks i.e. make sure buffer is drained of buffered tasks after a stop signal is received
  • Order of tasks received per thread must be maintained

I was thinking of implementing it using a Queue like below. Would appreciate feedback on the implementation. Are there any other brighter ideas to implement such a thing?

public class TestBuffer
{
    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    {
    }

    public void start()
    {
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    }

    private void run()
    {
        running = true;

        bool run = true;
        while(run)
        {
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            {
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                {
                    Monitor.Wait(queueLock);
                }

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }

            // If something was dequeued, handle it
            if (task != null)
            {
                handle(task);
            }

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            {
                run = queue.Count > 0 || running;
            }
        }
    }

    public void enqueue(Task toEnqueue)
    {
        lock (queueLock)
        {
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        }
    }

    public void stop()
    {
        lock (queueLock)
        {
            running = false;
            Monitor.PulseAll(queueLock);
        }
    }

    public void handle(Task dequeued)
    {
        dequeued.execute();
    }
}
like image 576
user1300560 Avatar asked Sep 11 '12 18:09

user1300560


People also ask

Is FIFO thread-safe?

The Queue module provides a FIFO implementation suitable for multi-threaded programming. It can be used to pass messages or other data between producer and consumer threads safely. Locking is handled for the caller, so it is simple to have as many threads as you want working with the same Queue instance.

Is C# queue thread-safe?

C# Queues can support multiple readers simultaneously. However, even when a collection is synchronized, it can still be modified by other threads, which will cause the enumerator to throw an exception. Generally, enumerating through a collection is not considered a thread-safe procedure.

Is queue thread-safe in Python?

Thread Programming Luckily, Queue() class has a thread-safe implementation with all the required locking mechanism. So producer and consumer from different threads can work with the same queue instance safely and easily.

What is a queue thread?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads.


2 Answers

You can actually handle this with the out-of-the-box BlockingCollection.

It is designed to have 1 or more producers, and 1 or more consumers. In your case, you would have multiple producers and one consumer.

When you receive a stop signal, have that signal handler

  • Signal producer threads to stop
  • Call CompleteAdding on the BlockingCollection instance

The consumer thread will continue to run until all queued items are removed and processed, then it will encounter the condition that the BlockingCollection is complete. When the thread encounters that condition, it just exits.

like image 148
Eric J. Avatar answered Sep 28 '22 04:09

Eric J.


You should think about ConcurrentQueue, which is FIFO, in fact. If not suitable, try some of its relatives in Thread-Safe Collections. By using these you can avoid some risks.

like image 22
Qerts Avatar answered Sep 28 '22 03:09

Qerts