Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ensuring task execution order in ThreadPool

I have been reading about the thread-pool pattern and I can't seem to find the usual solution for the following problem.

I sometimes want tasks to be executed serially. For example, I read chunks of text from a file and for some reason I need the chunks to be processed in that order. So basically I want to eliminate concurrency for some of the tasks.

Consider this scenario where the tasks with * need to be processed in the order they were pushed in. The other tasks can be processed in any order.

push task1 push task2 push task3   * push task4   * push task5 push task6   * .... and so on 

In the context of a thread-pool, without this constraint, a single queue of pending tasks works fine but clearly here it doesn't.

I thought about having some of the threads operate on a thread-specific queue and the others on the "global" queue. Then, in order to execute some of the tasks serially, I simply have to push them onto a queue where a single thread looks. It does sounds a bit clumsy.

So, the real question in this long story: how would you solve this ? How would you ensure those tasks are ordered?

EDIT

As a more general problem, suppose the scenario above becomes

push task1 push task2   ** push task3   * push task4   * push task5 push task6   * push task7   ** push task8   * push task9 .... and so on 

What I mean is that the tasks within a group should be executed sequentially, but the groups themselves can mix. So you can have 3-2-5-4-7 for example.

One other thing to note is that I don't have access to all the tasks in a group upfront (and I can't wait for all of them to arrive before starting the group).

like image 951
nc3b Avatar asked Aug 25 '11 14:08

nc3b


People also ask

What is the order in which tasks are done to execute a program?

The instruction cycle (also known as the fetch–decode–execute cycle, or simply the fetch-execute cycle) is the cycle that the central processing unit (CPU) follows from boot-up until the computer has shut down in order to process instructions.

Does task use ThreadPool?

By default, TPL types like Task and Task<TResult> use thread pool threads to run tasks. You can also use the thread pool by calling ThreadPool.

When should you not use ThreadPool?

Thread pools do not make sense when you need thread which perform entirely dissimilar and unrelated actions, which cannot be considered "jobs"; e.g., One thread for GUI event handling, another for backend processing. Thread pools also don't make sense when processing forms a pipeline.

What is the difference between thread and ThreadPool?

A thread pool is - as the name suggests - a pool of worker threads which are always running. Those threads then normally take tasks from a list, execute them, then try to take the next task. If there's no task, the thread will wait.


1 Answers

Something like the following will allow serial and parallel tasks to be queued, where serial tasks will be executed one after the other, and parallel tasks will be executed in any order, but in parallel. This gives you the ability to serialize tasks where necessary, also have parallel tasks, but do this as tasks are received i.e. you do not need to know about the entire sequence up-front, execution order is maintained dynamically.

internal class TaskQueue {     private readonly object _syncObj = new object();     private readonly Queue<QTask> _tasks = new Queue<QTask>();     private int _runningTaskCount;      public void Queue(bool isParallel, Action task)     {         lock (_syncObj)         {             _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });         }          ProcessTaskQueue();     }      public int Count     {         get{lock (_syncObj){return _tasks.Count;}}     }      private void ProcessTaskQueue()     {         lock (_syncObj)         {             if (_runningTaskCount != 0) return;              while (_tasks.Count > 0 && _tasks.Peek().IsParallel)             {                 QTask parallelTask = _tasks.Dequeue();                  QueueUserWorkItem(parallelTask);             }              if (_tasks.Count > 0 && _runningTaskCount == 0)             {                 QTask serialTask = _tasks.Dequeue();                  QueueUserWorkItem(serialTask);             }         }     }      private void QueueUserWorkItem(QTask qTask)     {         Action completionTask = () =>         {             qTask.Task();              OnTaskCompleted();         };          _runningTaskCount++;          ThreadPool.QueueUserWorkItem(_ => completionTask());     }      private void OnTaskCompleted()     {         lock (_syncObj)         {             if (--_runningTaskCount == 0)             {                 ProcessTaskQueue();             }         }     }      private class QTask     {         public Action Task { get; set; }         public bool IsParallel { get; set; }     } } 

Update

To handle task groups with serial and parallel task mixes, a GroupedTaskQueue can manage a TaskQueue for each group. Again, you do not need to know about groups up-front, it is all dynamically managed as tasks are received.

internal class GroupedTaskQueue {     private readonly object _syncObj = new object();     private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();     private readonly string _defaultGroup = Guid.NewGuid().ToString();      public void Queue(bool isParallel, Action task)     {         Queue(_defaultGroup, isParallel, task);     }      public void Queue(string group, bool isParallel, Action task)     {         TaskQueue queue;          lock (_syncObj)         {             if (!_queues.TryGetValue(group, out queue))             {                 queue = new TaskQueue();                  _queues.Add(group, queue);             }         }          Action completionTask = () =>         {             task();              OnTaskCompleted(group, queue);         };          queue.Queue(isParallel, completionTask);     }      private void OnTaskCompleted(string group, TaskQueue queue)     {         lock (_syncObj)         {             if (queue.Count == 0)             {                 _queues.Remove(group);             }         }     } } 
like image 63
Tim Lloyd Avatar answered Oct 02 '22 18:10

Tim Lloyd