Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retract a message in RabbitMQ?

I've got something like a job queue over RabbitMQ and, upon a request to cancel a job, I'd like to retract the tasks that have not yet started processing (their messages have not been ack'd), which corresponds to retracting these messages from the queues that they've been routed to.

I haven't found this functionality in AMQP or in the RabbitMQ API; perhaps I haven't searched well enough? Or will I have to use a workaround (it's not hard, but still)?

like image 984
jkff Avatar asked Sep 21 '10 08:09

jkff


People also ask

Does RabbitMQ delete message after consumed?

you are telling RabbitMQ to automatically acknowledge the message when it is consumed. acknowledging a message tells RabbitMQ that it has been taken care of and RabbitMQ can delete it now. set autoAck to false if you want to manually acknowledge the message after you are done processing it.

How do I clear my message queue?

To remove all messages from a particular queue, click the Queues tab. Select a queue, right-click, and then select Remove Messages (with NDR) or Remove Messages (without NDR). A dialog box appears that confirms the selected action and displays, Do you want to continue?. Click Yes.

How do I cancel a consumer in RabbitMQ?

cancel method by the broker, which they present to the consumer callback. For example, in our Java client, the Consumer interface has a handleCancel callback, which can be overridden by sub-classing the DefaultConsumer class: channel.


4 Answers

I would solve this scenario by having the worker check some sort of authoritative data source to determine if the the job should proceed or not. For example, the worker would check the job's status in a database to see if the job was canceled already.

For scenarios where the speed of processing jobs may be faster than the speed with which the authoritative store can be updated and read, a less guaranteed data store that trades speed for other characteristics may be useful.

An example of this would be to use Redis as the store for canceling processing of a message instead of a relational DB like MySQL. Redis is very fast, but makes fewer guarantees regarding the data it holds, whereas MySQL is much slower, but offers more guarantees about the data it holds.

In the end, the concept of checking with another source for whether or not to process a message is the same, but the way you implement that depends on your particular scenario.

like image 101
cdeszaq Avatar answered Oct 21 '22 18:10

cdeszaq


RabbitMQ doesn't let you modify or delete messages after they've been enqueued. For that, you want some kind of database to hold the state of each job, and to use RabbitMQ to notify interested parties of changes in that state.

For lowish volumes, you can kludge it together with a queue per job. Create the queue, post the job description to the queue, announce the name of the queue to the workers. If the job needs to be cancelled before it is processed, deleted the job's queue; when the workers come to fetch the job description, they'll notice the queue has vanished.

Lighterweight and generally better would be to use redis or another key/value store to hold the job state (with a deleted or absent record meaning a cancelled or nonexistent job) and to use rabbitmq to notify about new/removed/changed records in the key/value store.

like image 45
Tony Garnock-Jones Avatar answered Oct 21 '22 19:10

Tony Garnock-Jones


At least two ways to achieve your target:

  • basic.reject will requeue message if requeue=true is set (otherwise it will reject message).
    (supported since RabbitMQ 2.0.0; see http://www.rabbitmq.com/blog/2010/08/03/well-ill-let-you-go-basicreject-in-rabbitmq/).

  • basic.recover will ask broker to redeliver unacked messages on channel.

like image 3
Sergei Zimakov Avatar answered Oct 21 '22 18:10

Sergei Zimakov


You need to subscribe to all the queues to which messages have been routed, and consume them with ack.

For instance if you publish to a topic exchange with "test" as the routing key, and there are 3 persistent queues which subscribe to "test" you would need to consume those three queues. It might be better to add another queue which your consumer processes would also listen too, and tell them to ignore those messages.

An alternative, since you are using RabbitMQ, is to write a custom exchange plugin that will accept some out of band instruction to clear all queues. For instance you might have that exchange read a special message header that tells it to clear all queues to which this message is destined. This does require writing Erlang code, but there are 4 different exchange types implemented so you would only need to copy the most similar one and write the code for the new bahaviours. If you only use custom headers for this, then the body of the message can be a normal message for the consumers.

To sum up:

1) the publisher needs to consume the messages itself 2) the publisher can send a special message in a special queue to tell consumers to ignore the message 3) the publisher can send a special message to a custom exchange that will clear any existing messages from the queues before sending this special message to consumers.

like image 1
Michael Dillon Avatar answered Oct 21 '22 18:10

Michael Dillon