After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.
I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.
To reject messages in bulk, clients set the multiple flag of the basic. nack method to true . The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic. nack method.
The original expiry time of a message is preserved if it is requeued (for example due to the use of an AMQP method that features a requeue parameter, or due to a channel closure). Setting the TTL to 0 causes messages to be expired upon reaching a queue unless they can be delivered to a consumer immediately.
An Unacknowledged message implies that it has been read by your consumer, but the consumer has never sent back an ACK to the RabbitMQ broker to say that it has finished processing it.
Short answer:
To requeue specific message you can pick both basic.reject
or basic.nack
with multiple
flag set to false.
basic.consume
calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.
basic.recover
will redeliver all un-acked messages on specific channel.
Long answer:
basic.reject
and basic.nack
both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack
supports bulk messages processing, whilst basic.reject
doesn't.
This difference described in Negative Acknowledgements article on official RabbitMQ web site:
The AMQP specification defines the
basic.reject
method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately,basic.reject
provides no support for negatively acknowledging messages in bulk.To solve this, RabbitMQ supports the
basic.nack
method that provides all the functionality ofbasic.reject
whilst also allowing for bulk processing of messages.To reject messages in bulk, clients set the
multiple
flag of thebasic.nack
method totrue
. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in thedelivery_tag
field of thebasic.nack
method. In this respect,basic.nack
complements the bulk acknowledgement semantics ofbasic.ack
.
Note, that basic.nack
method is RabbitMQ-specific extension while basic.reject
method is part of AMQP 0.9.1 specification.
As to basic.cancel
method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel
method sending an receiving the cancel-ok
reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.
basic.recover
has some limitations in RabbitMQ: it - basic.recover with requeue=false - basic.recover synchronicity
In addition to errata, according to RabbitMQ specs basic.recover
has partial support (Recovery with requeue=false is not supported.)
Note about basic.consume
:
When basic.consume
started without auto-ack (noack=false
) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).
For example, let say we post originally 5 message in a row:
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:
Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)
where star (*
) notes that redelivered
flag set to true
.
Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages
Exchange(e-main) Exchange(e-dead) Queue(main){x-dead-letter-exchange: "e-dead"} Queue(dead)
And assume we post 5 message with expire
property set to 5000
(5 sec):
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head) Queue(dead) (tail) { }(head)
and then we consume 3 message from main
queue and hold them for 10 second:
Queue(main) (tail) { [2!] [1!] [0!] } (head) Queue(dead) (tail) { [4*] [3*] } (head)
where exclamation mark (!
) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:
Queue(main) (tail) { } (head) Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)
So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).
P.S.:
Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get
method description for more.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With