Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Queue with no subscribers

Tags:

c#

rabbitmq

"Durable" and "persistent mode" appear to relate to reboots rather than relating to there being no subscribers to receive the message.

I'd like RabbitMQ to keep messages on the queue when there are no subscribers. When a subscriber does come online, the message should be recieved by that subscriber. Is this possible with RabbitMQ?

Code sample:

Server:

namespace RabbitEg
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    //channel.ExchangeDelete(EXCHANGE_NAME);
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);
                    //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn);

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i);
                        IBasicProperties channelProps = channel.CreateBasicProperties();
                        channelProps.SetPersistent(true);

                        channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad);

                        Console.WriteLine("Sent Message " + i);
                        System.Threading.Thread.Sleep(25);
                    }

                    Console.ReadLine();
                }
            }
        }
    }
}

Client:

namespace RabbitListener
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);

                    string queueName = channel.QueueDeclare("myQueue", true, false, false, null);
                    channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}
like image 683
Mr Grok Avatar asked Oct 31 '11 09:10

Mr Grok


1 Answers

See the AMQP Reference for an explanation of what durable and persistent mean.

Basically, queues are either durable or non-durable. The former survive broker restarts, the latter do not.

Messages are published as either transient or persistent. The idea is that persistent messages on durable queues should also survive broker restarts.

So, to get what you want, you need to 1) declare the queue as durable and 2) publish the messages as persistent. In addition, you may also want to enable publisher confirms on the channel; that way, you'll know when the broker has assumed responsibility for the message.

like image 126
scvalex Avatar answered Oct 22 '22 04:10

scvalex