Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ C# verify message was sent

Tags:

c#

rabbitmq

I'm new to RabbitMQ and trying to write to a Queue and verify the message was sent. If it fails I need to know about it. I made a fake queue to watch it fail but no matter what I see no execptions and when I am looking for a ack I always get one. I never see the BasicNack.

I'm not even sure i'm the BasicAcks is the way to go.

    private void button1_Click(object sender, EventArgs e)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("task_queue", true, false, false, null);

                var message = ("Helllo world");
                var body = Encoding.UTF8.GetBytes(message);
                channel.ConfirmSelect();

                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);
                properties.DeliveryMode = 2;
                channel.BasicAcks += channel_BasicAcks;
                channel.BasicNacks += channel_BasicNacks;
                //fake queue should be task_queue
                channel.BasicPublish("", "task_2queue", true, properties, body);

                channel.WaitForConfirmsOrDie();

                Console.WriteLine(" [x] Sent {0}", message);
            }
        }
    }

    void channel_BasicNacks(IModel model, BasicNackEventArgs args)
    {

    }

    void channel_BasicAcks(IModel model, BasicAckEventArgs args)
    {

    }
like image 782
Andre DeMattia Avatar asked Jun 10 '14 22:06

Andre DeMattia


2 Answers

For those looking for a C# answer - here is what you need.

https://rianjs.net/2013/12/publisher-confirms-with-rabbitmq-and-c-sharp

Something like this: (BasicAcks attaches an event handler - there is also BasicNacks)

using (var connection = FACTORY.CreateConnection())
{
    var channel = connection.CreateModel();
    channel.ExchangeDeclare(QUEUE_NAME, ExchangeType.Fanout, true);
    channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
    channel.QueueBind(QUEUE_NAME, QUEUE_NAME, String.Empty, new Dictionary<string, object>());
     channel.BasicAcks += (sender, eventArgs) =>
                {
                    //implement ack handle
                };
    channel.ConfirmSelect();

    for (var i = 1; i <= numberOfMessages; i++)
    {
        var messageProperties = channel.CreateBasicProperties();
        messageProperties.SetPersistent(true);

        var message = String.Format("{0}\thello world", i);
        var payload = Encoding.Unicode.GetBytes(message);
        Console.WriteLine("Sending message: " + message);
        channel.BasicPublish(QUEUE_NAME, QUEUE_NAME, messageProperties, payload);
        channel.WaitForConfirmsOrDie();
    }
}
like image 91
Todd Vance Avatar answered Sep 24 '22 05:09

Todd Vance


You need a Publisher Confirms

as you can read you can implement:

The transaction:

ch.txSelect(); <-- start transaction
ch.basicPublish("", QUEUE_NAME,
                            MessageProperties.PERSISTENT_BASIC,
                            "nop".getBytes());
ch.txCommit();<--commit transaction

The message is stored to the queue and to the disk. This way can be slow, if you need performance you shouldn't use it.

You can use the Streaming Lightweight Publisher Confirms, using:

ch.setConfirmListener(new ConfirmListener() {
    public void handleAck(long seqNo, boolean multiple) {
        if (multiple) {
            unconfirmedSet.headSet(seqNo+1).clear();
        } else {
            unconfirmedSet.remove(seqNo);
        }
    }
    public void handleNack(long seqNo, boolean multiple) {
        // handle the lost messages somehow
    }

I hope it helps

like image 20
Gabriele Santomaggio Avatar answered Sep 24 '22 05:09

Gabriele Santomaggio