I'm trying to use a manual ACK on a very simple console application, but I can't make it work.
On the sender, I have the following code:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.ConfirmSelect();
channel.BasicAcks += (sender, e) =>
{
Console.Write("ACK received");
};
var properties = channel.CreateBasicProperties();
channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
On the receiver I have the following code:
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.ConfirmSelect();
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue",
noAck: false,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
What I expect is that the event BasicAcks
on the sender is fired when I call channel.BasicAck()
on the receiver, but that event is being fired when the message is delivered to the client, before consumer.Received
.
Is what I'm expecting the correct behavior or am I missing something?
ack is used for positive acknowledgements. basic. nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1) basic. reject is used for negative acknowledgements but has one limitation compared to basic.
Applications can subscribe to have RabbitMQ push enqueued messages (deliveries) to them. This is done by registering a consumer (subscription) on a queue. After a subscription is in place, RabbitMQ will begin delivering messages. For each delivery a user-provided handler will be invoked.
Your expectation is not correct. BasicAcks
is about publisher confirms, not about ack from receiver. So you publish a message to broker and broker (so, RabbitMQ itself) will ack or nack (negative acknowledge) you when it handles this message (for example - when it will write it to disk for persistent messages, or when in puts it in queue). Note that no receiver is involved here - it's entirely between publisher and RabbitMQ.
Now when you Ack message at receiver - that's again only between receiver and RabbitMQ - you tell rabbit that message is processed and can be safely deleted. This is done to handle situations when receiver crashes during processing - then rabbit will be able to deliver this message to the next receiver (if any).
Note that the whole purpose of such arcitecture is to separate publishers and receivers - they should not be dependent on each other.
If you have one receiver (there can be many) and you want to ensure it processed your message - use RPC pattern: send message and wait for another message back from this receiver.
Consumer:
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
await Task.Delay(2000);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(" [x] Done");
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With