Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I recover unacknowledged AMQP messages from other channels than my connection's own?

It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)

import pika  credentials = pika.PlainCredentials('***', '***') parameters = pika.ConnectionParameters(host='localhost',port=5672,\     credentials=credentials, virtual_host='***')  def handle_delivery(body):     """Called when we receive a message from RabbitMQ"""     print body  def on_connected(connection):     """Called when we are fully connected to RabbitMQ"""     connection.channel(on_channel_open)      def on_channel_open(new_channel):     """Called when our channel has opened"""     global channel     channel = new_channel     channel.basic_recover(callback=handle_delivery,requeue=True)      try:     connection = pika.SelectConnection(parameters=parameters,\         on_open_callback=on_connected)          # Loop so we can communicate with RabbitMQ     connection.ioloop.start() except KeyboardInterrupt:     # Gracefully close the connection     connection.close()     # Loop until we're fully closed, will stop on its own     connection.ioloop.start() 
like image 737
Will Olbrys Avatar asked Aug 15 '11 09:08

Will Olbrys


People also ask

What happens to unacknowledged messages in RabbitMQ?

An Unacknowledged message implies that it has been read by your consumer, but the consumer has never sent back an ACK to the RabbitMQ broker to say that it has finished processing it.

What are AMQP channels?

The MQ Channel plug-in is part of a suite of plug-ins designed to monitor an IBM WebSphere MQ server. MQ is a middleware product which allows independent client programs to communicate by passing messages to each other via a message queue.


1 Answers

Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.

Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.

like image 69
Brian Kelly Avatar answered Sep 19 '22 05:09

Brian Kelly