Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Manual ACK on c# client

Tags:

c#

rabbitmq

I'm trying to use a manual ACK on a very simple console application, but I can't make it work.

On the sender, I have the following code:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);

    channel.ConfirmSelect();
    channel.BasicAcks += (sender, e) =>
    {
        Console.Write("ACK received");
    };

    var properties = channel.CreateBasicProperties();

    channel.BasicPublish(exchange: "",
                         routingKey: "task_queue",
                         basicProperties: properties,
                         body: body);

    Console.WriteLine(" [x] Sent {0}", message);
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

On the receiver I have the following code:

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    channel.ConfirmSelect();

    Console.WriteLine(" [*] Waiting for messages.");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);

        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);

        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        Console.WriteLine(" [x] Done");
    };
    channel.BasicConsume(queue: "task_queue",
                         noAck: false,
                         consumer: consumer);

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

What I expect is that the event BasicAcks on the sender is fired when I call channel.BasicAck() on the receiver, but that event is being fired when the message is delivered to the client, before consumer.Received.

Is what I'm expecting the correct behavior or am I missing something?

like image 882
Mat-Tap Avatar asked May 18 '16 18:05

Mat-Tap


People also ask

What is ack in RabbitMQ?

ack is used for positive acknowledgements. basic. nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1) basic. reject is used for negative acknowledgements but has one limitation compared to basic.

Can RabbitMQ push messages?

Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.


2 Answers

Your expectation is not correct. BasicAcks is about publisher confirms, not about ack from receiver. So you publish a message to broker and broker (so, RabbitMQ itself) will ack or nack (negative acknowledge) you when it handles this message (for example - when it will write it to disk for persistent messages, or when in puts it in queue). Note that no receiver is involved here - it's entirely between publisher and RabbitMQ.

Now when you Ack message at receiver - that's again only between receiver and RabbitMQ - you tell rabbit that message is processed and can be safely deleted. This is done to handle situations when receiver crashes during processing - then rabbit will be able to deliver this message to the next receiver (if any).

Note that the whole purpose of such arcitecture is to separate publishers and receivers - they should not be dependent on each other.

If you have one receiver (there can be many) and you want to ensure it processed your message - use RPC pattern: send message and wait for another message back from this receiver.

like image 70
Evk Avatar answered Sep 28 '22 04:09

Evk


Consumer:

consumer.Received += async (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split('.').Length - 1;
    await Task.Delay(2000);

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    Console.WriteLine(" [x] Done");
};
like image 30
Son Nguyen Avatar answered Sep 28 '22 04:09

Son Nguyen