Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing a queue of items asynchronously in C#

I am trying to create a system that processes a queue of work. The system has the following specifications:

  1. The system has two components, a work assigner and a worker.
  2. There is a set upper limit on the number of workers running at the same time. That upper limit is greater than one.
  3. To avoid problems with the same task being worked on twice, there is only a single work assigner.

What design would you use to create such a system? Here is what I am thinking:

  1. Create a collection of queues, one queue for each worker
  2. Create a Timer for the work assigner. Its job would be to populate the queues.
  3. Create a Timer for each worker, passing in a queue object as the object state to represent its workload
  4. Remove and add to the queues while they are locked.
  5. Use a counter that is incremented and decremented while locked to ensure that no more than the specified number of worker tasks is running at the same time.

I feel like there must be a better way to do this. What would you recommend? Should I switch from Timers to Threads for the workers? Should the Threads just spin/wait while the queue is empty? Should the threads close and have the work assigner conditionally create a new one?

like image 405
jmacinnes Avatar asked Dec 12 '22 05:12

jmacinnes


1 Answers

I don't know how long your tasks will be running, but it seems that the best thing to do would be to use ThreadPool. Furthermore, I would use, and actually have used, only one central queue - that alone will remove some complexity. I have one Thread that handles the queue and does an action on the item in your case it would be to queue a task.

As for making the queue threadsafe, there is a ConcurrentQueue in System.Collections.Concurrent for that very purpose (msdn, benchmark vs locking queue).

Now, throw in a BlockingCollection (msdn) and you have all you need.

        BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
        while (true)
        {
            var packet = sendQueue.Take(); //this blocks if there are no items in the queue.
            ThreadPool.QueueUserWorkItem(state =>
            {
               var data = (Packet)state;
               //do whatever you have to do
            }, packet );
        }

and somewhere there is something that sendQueue.Add(packet);

To sum up,

  1. One queue for all "workers"
  2. One thread that dequeues from the queue and passes it to the ThreadPool.

I think that's it.

ps: if you have to control the amount of threads, use "Smart Thread Pool" as suggested by josh3736

like image 150
Brunner Avatar answered Jan 05 '23 02:01

Brunner