Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Framework as Message queue using BlockingCollection

I've been doing some work lately with the Reactive Framework and have been absolutely loving it so far. I'm looking at replacing a traditional polling message queue with some filtered IObservables to clean up my server operations. In the old way, I dealt with messages coming into the server like so:

// Start spinning the process message loop
   Task.Factory.StartNew(() =>
   {
       while (true)
       {
           Command command = m_CommandQueue.Take();
           ProcessMessage(command);
       }
   }, TaskCreationOptions.LongRunning);

Which results in a continuously polling thread that delegates commands from clients out to the ProcessMessage method where I have a series of if/else-if statements that determine the type of the command and delegate work based on its type

I am replacing this with an event driven system using Reactive for which I've written the following code:

 private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
 private IObservable<BesiegedMessage> m_MessagePublisher;

 m_MessagePublisher = m_MessageQueue
       .GetConsumingEnumerable()
       .ToObservable(TaskPoolScheduler.Default);

        // All generic Server messages (containing no properties) will be processed here
 IDisposable genericServerMessageSubscriber = m_MessagePublisher
       .Where(message => message is GenericServerMessage)
       .Subscribe(message =>
       {
           // do something with the generic server message here
       }

My question is that while this works, is it good practice to use a blocking collection as the backing for an IObservable like this? I don't see where Take() is ever called this way which makes me think that the Messages will pile off on the queue without being removed after they have been processed?

Would it be more efficient to look into Subjects as the backing collection to drive the filtered IObservables that will be picking up these messages? Is there anything else I'm missing here that might benefit the architecture of this system?

like image 344
Jesse Carter Avatar asked Apr 02 '13 19:04

Jesse Carter


3 Answers

Here's something pulled directly from my posterior - any real solution would be very much dependent on your actual usage, but here's "The cheapest pseudo Message Queue system ever":

Thoughts/motivations:

  • Deliberate exposure of IObservable<T> such that subscribers can do any filtering/cross subscriptions they want to
  • The overall Queue is typeless, but Register and Publish are type-safe(ish)
  • YMMV with the Publish() where it is - try experimenting with moving it around
  • Generally Subject is a no-no, although in this case it does make for some SIMPLE code.
  • One could "internalize" the registration to actually do the subscription as well, but then the queue would need to manage the IDisposables created - bah, let your consumers deal with it!

The Code:

public class TheCheapestPubSubEver
{    
    private Subject<object> _inner = new Subject<object>();

    public IObservable<T> Register<T>()
    {
        return _inner.OfType<T>().Publish().RefCount();
    }
    public void Publish<T>(T message)
    {
        _inner.OnNext(message);
    }
}

Usage:

void Main()
{
    var queue = new TheCheapestPubSubEver();

    var ofString = queue.Register<string>();
    var ofInt = queue.Register<int>();

    using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
    using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
    {
        queue.Publish("Foo");
        queue.Publish(1);
        Console.ReadLine();
    }
}

Output:

A string! Foo
An int! 1

HOWEVER, this doesn't strictly enforce "consuming consumers" - multiple Registers of a specific type would result in multiple observer calls - that is:

var queue = new TheCheapestPubSubEver();

var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();

using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}", s)))

{
    queue.Publish("Foo");
    queue.Publish(1);
    Console.ReadLine();
}

Results in:

A string! Foo
Another string! Foo
An int! 1
like image 97
JerKimball Avatar answered Nov 16 '22 01:11

JerKimball


Here is a complete worked example, tested under Visual Studio 2012.

  1. Create a new C# console app.
  2. Right click on your project, select "Manage NuGet Packages", and add "Reactive Extensions - Main Library".

Add this C# code:

using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<string> myQueue = new BlockingCollection<string>();
            {                
                IObservable<string> ob = myQueue.
                  GetConsumingEnumerable().
                  ToObservable(TaskPoolScheduler.Default);

                ob.Subscribe(p =>
                {
                    // This handler will get called whenever 
                    // anything appears on myQueue in the future.
                    Console.Write("Consuming: {0}\n",p);                    
                });
            }
            // Now, adding items to myQueue will trigger the item to be consumed
            // in the predefined handler.
            myQueue.Add("a");
            myQueue.Add("b");
            myQueue.Add("c");           
            Console.Write("[any key to exit]\n");
            Console.ReadKey();
        }
    }
}

You will see this on the console:

[any key to exit]
Consuming: a
Consuming: b
Consuming: c

The really nice thing about using RX is that you can use the full power of LINQ to filter out any unwanted messages. For example, add a .Where clause to filter by "a", and observe what happens:

ob.Where(o => (o == "a")).Subscribe(p =>
{
    // This will get called whenever something appears on myQueue.
    Console.Write("Consuming: {0}\n",p);                    
});

Philosophical notes

The advantage of this method over starting up a dedicated thread to poll the queue, is that you don't have to worry about disposing of the thread properly once the program has exited. This means you don't have to bother with IDisposable or CancellationToken (which is always required when dealing with a BlockingCollection or else your program might hang on exit with a thread that refuses to die).

Believe me, its not as easy as you think to write completely robust code to consume events coming out of a BlockingCollection. I much prefer using the RX method, as shown above as its cleaner, more robust, has less code, and you can filter using LINQ.

Latency

I was surprised at how fast this method is.

On my Xeon X5650 @ 2.67Ghz, it takes 5 seconds to process 10 million events, which works out at approximately 0.5 microseconds per event. It took 4.5 seconds to put the items into the BlockingCollection, so RX was taking them out and processing them almost as fast as they were going in.

Threading

In all of my tests, RX only spun up one thread to handle the tasks on the queue.

This means that we have a very nice pattern: we can use RX to collect incoming data from multiple threads, place them into a shared queue, then process the queue contents on a single thread (which is, by definition, thread safe).

This pattern eliminates a huge amount of headaches when dealing with multithreaded code, by decoupling the producer and consumer of data via a queue, where the producer could be multi-threaded and the consumer is single-threaded and thus thread-safe. This is the concept that makes Erlang so robust. For more information on this pattern, see Multi-threading made ridiculously simple.

like image 43
Contango Avatar answered Nov 16 '22 03:11

Contango


I haven't used BlockingCollection in this context - so I'm 'conjecturing' - you should run it to approve, disprove.

BlockingCollection might only further complicate things here (or provide little help). Take a look at this post from Jon - simply to confirm. GetConsumingEnumerable will provide you with 'per subscriber' enumerable. Exhausting them down eventually - something to have in mind with Rx.

Also the the IEnumerable<>.ToObservable further flattens out the 'source'. As it works (you can lookup the source - I'd recommend w/ Rx more than anything) - each subscribe creates an own 'enumerator' - so all will be getting their own versions of the feed. I'm really not sure, how that pans out in the Observable scenario like this.

Anyhow - if you want to provide app-wide messages - IMO you'd need to introduce Subject or state in some other form (e.g. Publish etc.). And in that sense, I don't think BlockingCollection will help any - but again, it's best that you try it out yourself.

Note (a philosophical one)

If you want to combine message types, or combine different sources - e.g. in a more 'real world' scenario - it gets more complex. And it gets quite interesting I must say.

Keep an eye on having them 'rooted' into a single-shared stream (and avoid what Jer suggested rightly).

I'd recommend that you don't try to evade using Subject. For what you need, that's your friend - no matter all the no-state related discussions and how Subject is bad - you effectively have a state (and you need a 'state') - Rx kicks in 'after the fact', so you enjoy benefits from it regardless.

I encourage you to go that way, as I love it how it turned out.

like image 29
NSGaga-mostly-inactive Avatar answered Nov 16 '22 02:11

NSGaga-mostly-inactive