Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use IObservable/IObserver with ConcurrentQueue or ConcurrentStack

I realized that when I am trying to process items in a concurrent queue using multiple threads while multiple threads can be putting items into it, the ideal solution would be to use the Reactive Extensions with the Concurrent data structures.

My original question is at:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

So I am curious if there is any way to have a LINQ (or PLINQ) query that will continuously be dequeueing as items are put into it.

I am trying to get this to work in a way where I can have n number of producers pushing into the queue and a limited number of threads to process, so I don't overload the database.

If I could use Rx framework then I expect that I could just start it, and if 100 items are placed in within 100ms, then the 20 threads that are part of the PLINQ query would just process through the queue.

There are three technologies I am trying to work together:

  1. Rx Framework (Reactive LINQ)
  2. PLING
  3. System.Collections.Concurrent structures
like image 929
James Black Avatar asked Jun 13 '10 00:06

James Black


2 Answers

Drew is right, I think the ConcurrentQueue even though it sounds perfect for the job is actually the underlying data structure that the BlockingCollection uses. Seems very back to front to me too. Check out chapter 7 of this book* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 and it will explain how to use the BlockingCollection and have multiple producers and multiple consumers each taking off the "queue". You will want to look at the "GetConsumingEnumerable()" method and possibly just call .ToObservable() on that.

*the rest of the book is pretty average.

edit:

Here is a sample program that I think does what you want?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
like image 190
Lee Campbell Avatar answered Oct 22 '22 04:10

Lee Campbell


I don't know how best to accomplish this with Rx, but I would recommend just using BlockingCollection<T> and the producer-consumer pattern. Your main thread adds items into the collection, which uses ConcurrentQueue<T> underneath by default. Then you have a separate Task that you spin up ahead of that which uses Parallel::ForEach over the BlockingCollection<T> to process as many items from the collection as makes sense for the system concurrently. Now, you will probably also want to look into using the GetConsumingPartitioner method of the ParallelExtensions library in order to be most efficient since the default partitioner will create more overhead than you want in this case. You can read more about this from this blog post.

When the main thread is finished you call CompleteAdding on the BlockingCollection<T> and Task::Wait on the Task you spun up to wait for all the consumers to finish processing all the items in the collection.

like image 3
Drew Marsh Avatar answered Oct 22 '22 03:10

Drew Marsh