Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How (and if) to write a single-consumer queue using the TPL?

I've heard a bunch of podcasts recently about the TPL in .NET 4.0. Most of them describe background activities like downloading images or doing a computation, using tasks so that the work doesn't interfere with a GUI thread.

Most of the code I work on has more of a multiple-producer / single-consumer flavor, where work items from multiple sources must be queued and then processed in order. One example would be logging, where log lines from multiple threads are sequentialized into a single queue for eventual writing to a file or database. All the records from any single source must remain in order, and records from the same moment in time should be "close" to each other in the eventual output.

So multiple threads or tasks or whatever are all invoking a queuer:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

And a dedicated worker thread processes the queue:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

It's always seemed reasonable to dedicate a worker thread for the consumer side of these tasks. Should I write future programs using some construct from the TPL instead? Which one? Why?

like image 486
Eric Avatar asked Feb 19 '10 04:02

Eric


2 Answers

You can use a long running Task to process items from a BlockingCollection as suggested by Wilka. Here's an example which pretty much meets your applications requirements. You'll see output something like this:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

Not that outputs from A, B, C & D appear random because they depend on the start time of the threads but B always appears before B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}
like image 142
Ade Miller Avatar answered Oct 23 '22 20:10

Ade Miller


I know this answer is about a year late, but take a look at MSDN.

which shows how to create a LimitedConcurrencyLevelTaskScheduler from the TaskScheduler class. By limiting the concurrency to a single task, that should then process your tasks in order as they are queued via:

LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(lcts);

factory.StartNew(()=> 
{
   // your code
});
like image 39
Barry Avatar answered Oct 23 '22 20:10

Barry