Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# queueing dependant tasks to be processed by a thread pool

I want to queue dependant tasks across several flows that need to be processed in order (in each flow). The flows can be processed in parallel.

To be specific, let's say I need two queues and I want the tasks in each queue to be processed in order. Here is sample pseudocode to illustrate the desired behavior:

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

Here is a diagram with arrows illustrating dependencies between work items:

enter image description here

The question is how do I do this using C# 4.0/.NET 4.0? Right now I have two worker threads, one per queue and I use a BlockingCollection<> for each queue. I would like to instead leverage the .NET thread pool and have worker threads process items concurrently (across flows), but serially within a flow. In other words I would like to be able to indicate that for example wi1b depends on completion of wi1a, without having to track completion and remember wi1a, when wi1b arrives. In other words, I just want to say, "I want to submit a work item for queue1, which is to be processed serially with other items I have already submitted for queue1, but possibly in parallel with work items submitted to other queues".

I hope this description made sense. If not please feel free to ask questions in the comments and I will update this question accordingly.

Thanks for reading.

Update:

To summarize "flawed" solutions so far, here are the solutions from the answers section that I cannot use and the reason(s) why I cannot use them:

TPL tasks require specifying the antecedent task for a ContinueWith(). I do not want to maintain knowledge of each queue's antecedent task when submitting a new task.

TDF ActionBlocks looked promising, but it would appear that items posted to an ActionBlock are processed in parallel. I need for the items for a particular queue to be processed serially.

Update 2:

RE: ActionBlocks

It would appear that setting the MaxDegreeOfParallelism option to one prevents parallel processing of work items submitted to a single ActionBlock. Therefore it seems that having an ActionBlock per queue solves my problem with the only disadvantage being that this requires the installation and deployment of the TDF library from Microsoft and I was hoping for a pure .NET 4.0 solution. So far, this is the candidate accepted answer, unless someone can figure out a way to do this with a pure .NET 4.0 solution that doesn't degenerate to a worker thread per queue (which I am already using).

like image 504
Michael Goldshteyn Avatar asked Jun 27 '12 15:06

Michael Goldshteyn


1 Answers

I understand you have many queues and don't want to tie up threads. You could have an ActionBlock per queue. The ActionBlock automates most of what you need: It processes work items serially, and only starts a Task when work is pending. When no work is pending, no Task/Thread is blocked.

like image 108
usr Avatar answered Oct 29 '22 22:10

usr