Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Threadpool multi-queue job dispatch algorithm

I'm curious to know if there is a widely accepted solution for managing thread resources in a threadpool given the following scenario/constraints:

  1. Incoming jobs are all of the same nature and could be processed by any thread in the pool.
  2. Incoming jobs will be 'bucketed' into different queues based on some attribute of the incoming job such that all jobs going to the same bucket/queue MUST be processed serially.
  3. Some buckets will be less busy than others at different points during the lifetime of the program.

My question is on the theory behind a threadpool's implementation. What algorithm could be used to efficiently allocate available threads to incoming jobs across all buckets?

Edit: Another design goal would be to eliminate as much latency as possible between a job being enqueued and it being picked up for processing, assuming there are available idle threads.

Edit2: In the case I'm thinking of there are a relatively large number of queues (50-100) which have unpredictable levels of activity, but probably only 25% of them will be active at any given time.

The first (and most costly) solution I can think of is to simply have 1 thread assigned to each queue. While this will ensure incoming requests are picked up immediately, it is obviously inefficient.

The second solution is to combine the queues together based on expected levels of activity so that the number of queues is inline with the number of threads in the pool, allowing one thread to be assigned to each queue. The problem here will be that incoming jobs, which otherwise could be processed in parallel, will be forced to wait on each other.

The third solution is to create the maximum number of queues, one for each set of jobs that must be processed serially, but only allocate threads based on the number of queues we expect to be busy at any given time (which could also be adjusted by the pool at runtime). So this is where my question comes in: Given that we have more queues than threads, how does the pool go about allocating idle threads to incoming jobs in the most efficient way possible?

I would like to know if there is a widely accepted approach. Or if there are different approaches - who makes use of which one? What are the advantages/disadvantages, etc?

Edit3:This might be best expressed in pseudo code.

like image 753
hifier Avatar asked May 09 '11 05:05

hifier


2 Answers

You should probably eliminate nr. 2 from your specification. All you really need to comply to is that threads take up buckets and process the queues inside the buckets in order. It makes no sense to process a serialized queue with another threadpool or do some serialization of tasks in parallel. Thus your spec simply becomes that the threads iterate the fifo in the buckets and it's up to the poolmanager to insert properly constructed buckets. So your bucket will be:

struct task_bucket
{
    void *ctx; // context relevant data
    fifo_t *queue; // your fifo
};

Then it's up to you to make the threadpool smart enough to know what to do on each iteration of the queue. For example the ctx can be a function pointer and the queue can contain data for that function, so the worker thread simply calls the function on each iteration with the provided data.

Reflecting the comments: If the size of the bucket list is known before hand and isn't likely to change during the lifetime of the program, you'd need to figure out if that is important to you. You will need some way for the threads to select a bucket to take. The easiest way is to have a FIFO queue that is filled by the manager and emptied by the threads. Classic reader/writer.

Another possibility is a heap. The worker removes the highest priority from the heap and processes the bucket queue. Both removal by the workers and insertion by the manager reorders the heap so that the root node is the highest priority.

Both these strategies assume that the workers throw away the buckets and the manager makes new ones.

If keeping the buckets is important, you run the risk of workers only attending to the last modified task, so the manager will either need to reorder the bucket list or modify priorities of each bucket and the worker iterates looking for the highest priority. It is important that memory of ctx remains relevant while threads are working or threads will have to copy this as well. Workers can simply assign the queue locally and set queue to NULL in the bucket.

like image 180
Mel Avatar answered Nov 11 '22 02:11

Mel


ADDED: I now tend to agree that you might start simple and just keep a separate thread for each bucket, and only if this simple solution is understood to have problems you look for something different. And a better solution might depend on what exactly problems the simple one causes.

In any case, I leave my initial answer below, appended with an afterthought.


You can make a special global queue of "job is available in bucket X" signals.

All idle workers would wait on this queue, and when a signal is put into the queue one thread will take it and proceed to the corresponding bucket to process jobs there until the bucket becomes empty.

When an incoming job is submitted into an in-order bucket, it should be checked whether a worker thread is assigned to this bucket already. If assigned, the new job will be eventually processed by this worker thread, so no signal should be sent. If not worker is assigned, check whether the bucket is empty or not. If empty, place a signal into the global signal queue that a new job has arrived in this bucket; if not empty, such a signal should have been made already and a worker thread should soon arrive, so do nothing.

ADDED: I got a thought that my idea above can cause starvation for some jobs if the number of threads is less than the number of "active" buckets and there is a non-ending flow of incoming tasks. If all threads are already busy and a new job arrives into a bucket that is not yet served, it may take long time before a thread is freed to work on this new job. So there is a need to check if there are idle workers, and if not, create a new one... which adds more complexity.

like image 29
Alexey Kukanov Avatar answered Nov 11 '22 00:11

Alexey Kukanov