Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading from multiple queues, RabbitMQ

I am new to RabbitMQ. I want to be able to handle reading messages without blocking when there are multiple queues (to read from). Any inputs on how I can do that?

//Edit 1

public class Rabbit : IMessageBus
{   

    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    Subscription sub = null;

    public void writeMessage( Measurement m1 ) {
        byte[] body = Measurement.AltSerialize( m1 );
        int msgCount = 1;
        Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);

        string finalQueue = publishToQueue( m1.id );

        while (msgCount --> 0) {
            channel.BasicPublish("amq.direct", finalQueue, null, body);
        }

        Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
    }

     public string publishToQueue(string firstQueueName) {
        Console.WriteLine("Creating a queue and binding it to amq.direct");
        string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
        channel.QueueBind(queueName, "amq.direct", queueName, null);
        Console.WriteLine("Done.  Created queue {0} and bound it to amq.direct.\n", queueName);
        return queueName;
    }


    public Measurement readMessage() {
        Console.WriteLine("Receiving message...");
        Measurement m = new Measurement();

        int i = 0;
        foreach (BasicDeliverEventArgs ev in sub) {
            m = Measurement.AltDeSerialize(ev.Body);
            //m.id = //get the id here, from sub
            if (++i == 1)
                break;
            sub.Ack();
        }

        Console.WriteLine("Done.\n");
        return m;
    }


    public void subscribeToQueue(string queueName ) 
    {
        sub = new Subscription(channel, queueName);
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public Rabbit(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        //consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    ~Rabbit()
    {
        //observer??
        connection.Dispose();
        //channel.Dispose();
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//Edit 2

private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            foreach (BasicDeliverEventArgs ev in element) {
                //ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    return m;
                }
                m =  null;  
            }           
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub);     
    }

//Edit 3

//MessageHandler.cs

public class MessageHandler
{   
    // Implementation of methods for Rabbit class go here
    private List<string> publishQ = new List<string>();
    private List<string> subscribeQ = new List<string>();

    ConnectionFactory factory = null;
    IConnection connection = null;
    IModel channel = null;  
    QueueingBasicConsumer consumer = null;  

    private List<Subscription> subscriptions = new List<Subscription>();
    Subscription sub = null;

    public void writeMessage ( Measurement m1 )
    {
        byte[] body = Measurement.AltSerialize( m1 );
        //declare a queue if it doesn't exist
        publishToQueue(m1.id);

        channel.BasicPublish("amq.direct", m1.id, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", m1.id);
    }

    public void publishToQueue(string queueName)
    {   
        string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
        channel.QueueBind(finalQueueName, "amq.direct", "", null);
    }

    public Measurement readMessage()
    {
        Measurement m = new Measurement();
        foreach(Subscription element in subscriptions)
        {
            if( element.QueueName == null)
            {
                m = null;
            }
            else 
            {
                BasicDeliverEventArgs ev = element.Next();
                if( ev != null) {
                    m = Measurement.AltDeSerialize( ev.Body );
                    m.id = element.QueueName;
                    element.Ack();
                    return m;
                }
                m =  null;                      
            }
            element.Ack();
        }   
        System.Console.WriteLine("No message in the queue(s) at this time.");
        return m;
    }

    public void subscribeToQueue(string queueName) 
    {   
        sub = new Subscription(channel, queueName);
        subscriptions.Add(sub); 
    }

    public static string MsgSysName;
    public string MsgSys
    {
        get 
        { 
            return MsgSysName;
        }
        set
        {
            MsgSysName = value;
        }
    }

    public MessageHandler(string _msgSys) //Constructor
    {   
        factory = new ConnectionFactory();
        factory.HostName = "localhost"; 
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        consumer = new QueueingBasicConsumer(channel);

        System.Console.WriteLine("\nMsgSys: RabbitMQ");
        MsgSys = _msgSys;
    }

    public void disposeAll()
    {
        connection.Dispose();
        channel.Dispose();
        foreach(Subscription element in subscriptions)
        {
            element.Close();
        }
        System.Console.WriteLine("\nDestroying RABBIT");
    }   
}

//App1.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;


public class MainClass
{
    public static void Main()
    {

    MessageHandler obj1 = MessageHandler("Rabbit");

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create new Measurement messages
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33);

    System.Console.WriteLine("Test message 1:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    System.Console.WriteLine("Test message 2:\n    ID: {0}", m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);   

    // Ask queue name and store it
    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName );

    // Write message to the queue
    obj1.writeMessage( m1 );    

    System.Console.WriteLine("\nName of queue to publish to: ");
    string queueName2 = (System.Console.ReadLine()).ToString();
    obj1.publishToQueue( queueName2 );

    obj1.writeMessage( m2 );

    obj1.disposeAll();
}
}

//App2.cs

using System;
using System.IO;

using UtilityMeasurement;
using UtilityMessageBus;

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    //Create a new Measurement object m
    Measurement m = new Measurement();  

    System.Console.WriteLine("Queue name to subscribe to: ");
    string QueueName1 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName1 );

    //Read message into m
    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    System.Console.WriteLine("Another queue name to subscribe to: ");
    string QueueName2 = (System.Console.ReadLine()).ToString();
    obj1.subscribeToQueue( QueueName2 );

    m = obj1.readMessage();

    if (m != null ) {
        System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}", m.id, m.id);
        System.Console.WriteLine("    Time: {0}", m.time);
        System.Console.WriteLine("    Value: {0}", m.value);
    }

    obj1.disposeAll();
}
}
like image 822
Demi Avatar asked Jul 14 '11 16:07

Demi


People also ask

How do I read multiple messages on RabbitMQ?

To increase the performance and to consume more messages at a time, do as follows: Open the "RabbitMQ connection" and go to the Event sources tab. In Advanced Settings > "Other Attributes:", add “concurrentConsumers” property. For instance: concurrentConsumers=10.

Can RabbitMQ have multiple queues?

Use multiple queues and consumers Queues are single-threaded in RabbitMQ, and one queue can handle up to about 50 thousand messages. You will achieve better throughput on a multi-core system if you have multiple queues and consumers and if you have as many queues as cores on the underlying node(s).

Is it possible that multiple consumers of a RabbitMQ queue get the same message?

No it's not, single queue/multiple consumers with each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.


2 Answers

two sources of info:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. You should really try to understand the examples first.

    • %Program Files%\RabbitMQ\DotNetClient\examples\src (basic examples)

    • get full working examples from their Mercurial repository (c# projects).

Useful operations to understand:

  • Declare / Assert / Listen / Subscribe / Publish

Re: your question -- there's no reason why you can't have multiple listenners. Or you could subscribe to n routing paths with one listenner on an "exchange".

** re: non-blocking **

A typical listenner consumes messages one at a time. You can pull them off the queue, or they will automatically be placed close to the consumer in a 'windowed' fashion (defined through quality of service qos parameters). The beauty of the approach is that a lot of hard work is done for you (re: reliability, guaranteed delivery, etc.).

A key feature of RabbitMQ, is that if there is an error in processing, then the message is re-added back into the queue (a fault tolerance feature).

Need to know more about you situation.

Often if you post to the list I mentioned above, you can get hold of someone on staff at RabbitMQ. They're very helpful.

Hope that helps a little. It's a lot to get your head around at first, but it is worth persisting with.


Q&A

see: http://www.rabbitmq.com/faq.html

Q. Can you subscribe to multiple queues using new Subscription(channel, queueName) ?

Yes. You either use a binding key e.g. abc.*.hij, or abc.#.hij, or you attach multiple bindings. The former assumes that you have designed your routing keys around some kind of principle that makes sense for you (see routing keys in the FAQ). For the latter, you need to bind to more than one queue.

Implementing n-bindings manually. see: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

there's not much code behind this pattern, so you could roll your own subscription pattern if wildcards are not enough. you could inherit from this class and add another method for additional bindings... probably this will work or something close to this (untested).

The AQMP spec says that multiple manual binding are possible: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?

With a subscriber you are notified when a message is available. Otherwise what you are describing is a pull interface where you pull the message down on request. If no messages available, you'll get a null as you'd like. btw: the Notify method is probably more convenient.

Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code

Live code:

this version must use wild cards to subscribe to more than one routing key

n manual routing keys using subscription is left as an exercise for the reader. ;-) I think you were leaning towards a pull interface anyway. btw: pull interfaces are less efficient than notify ones.

        using (Subscription sub = new Subscription(ch, QueueNme))
        {
            foreach (BasicDeliverEventArgs ev in sub)
            {
                Process(ev.Body);

        ...

Note: the foreach uses IEnumerable, and IEnumerable wraps the event that a new message has arrived through the "yield" statement. Effectively it is an infinite loop.

--- UPDATE

AMQP was designed with the idea of keeping the number of TCP connections as low as the number of applications, so that means you can have many channels per connection.

the code in this question (edit 3) tries to use two subscribers with one channel, whereas it should (I believe), be one subscriber per channel per thread to avoid locking issues. Sugestion: use a routing key "wildcard". It is possible to subscribe to more than one distinct queue names with the java client, but the .net client does not to my knowledge have this implemented in the Subscriber helper class.

If you really do need two distinct queue names on the same subscription thread, then the following pull sequence is suggested for .net:

        using (IModel ch = conn.CreateModel()) {    // btw: no reason to close the channel afterwards IMO
            conn.AutoClose = true;                  // no reason to closs the connection either.  Here for completeness.

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:");
            }

            return 0;
        }

-- UPDATE 2:

from RabbitMQ list:

"assume that element.Next() is blocking on one of the subscriptions. You could retrieve deliveries from each subscription with a timeout to read past it. Alternatively you could set up a single queue to receive all measurements and retrieve messages from it with a single subscription." (Emile)

What that means is that when the first queue is empty, .Next() blocks waiting for the next message to appear. i.e. the subscriber has a wait-for-next-message built in.

-- UPDATE 3:

under .net, use the QueueingBasicConsumer for consumption from multiple queues.

Actually here's a thread about it to get a feel on usage:

Wait for a single RabbitMQ message with a timeout

-- UPDATE4:

some more info on the .QueueingBasicConsumer

There's example code here.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

example copied into the answer with a few modifications (see //<-----).

                IModel channel = ...;
            QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, false, null, consumer);  //<-----
            channel.BasicConsume(queueName2, false, null, consumer); //<-----
            // etc. channel.BasicConsume(queueNameN, false, null, consumer);  //<-----

            // At this point, messages will be being asynchronously delivered,
            // and will be queueing up in consumer.Queue.

            while (true) {
                try {
                    BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
                    // ... handle the delivery ...
                    channel.BasicAck(e.DeliveryTag, false);
                } catch (EndOfStreamException ex) {
                    // The consumer was cancelled, the model closed, or the
                    // connection went away.
                    break;
                }
            }

-- UPDATE 5 : a simple get that will act on any queue (a slower, but sometimes more convenient method).

            ch.QueueDeclare(queueName);
            BasicGetResult result = ch.BasicGet(queueName, false);
            if (result == null) {
                Console.WriteLine("No message available.");
            } else {
                ch.BasicAck(result.DeliveryTag, false);
                Console.WriteLine("Message:"); 
                // deserialize body and display extra info here.
            }
like image 117
14 revs Avatar answered Sep 30 '22 14:09

14 revs


The easiest way is to use the EventingBasicConsumer. I have an example on my site on how to use it. RabbitMQ EventingBasicConsumer

This Consumer class exposes a Received Event you can use, and therefore does NOT block. The rest of the code basically stays the same.

like image 24
Kelly Avatar answered Sep 30 '22 14:09

Kelly