It seems the longer I keep my rabbitmq server running, the more trouble I have with unacknowledged messages. I would love to requeue them. In fact there seems to be an amqp command to do this, but it only applies to the channel that your connection is using. I built a little pika script to at least try it out, but I am either missing something or it cannot be done this way (how about with rabbitmqctl?)
import pika credentials = pika.PlainCredentials('***', '***') parameters = pika.ConnectionParameters(host='localhost',port=5672,\ credentials=credentials, virtual_host='***') def handle_delivery(body): """Called when we receive a message from RabbitMQ""" print body def on_connected(connection): """Called when we are fully connected to RabbitMQ""" connection.channel(on_channel_open) def on_channel_open(new_channel): """Called when our channel has opened""" global channel channel = new_channel channel.basic_recover(callback=handle_delivery,requeue=True) try: connection = pika.SelectConnection(parameters=parameters,\ on_open_callback=on_connected) # Loop so we can communicate with RabbitMQ connection.ioloop.start() except KeyboardInterrupt: # Gracefully close the connection connection.close() # Loop until we're fully closed, will stop on its own connection.ioloop.start()
An Unacknowledged message implies that it has been read by your consumer, but the consumer has never sent back an ACK to the RabbitMQ broker to say that it has finished processing it.
The MQ Channel plug-in is part of a suite of plug-ins designed to monitor an IBM WebSphere MQ server. MQ is a middleware product which allows independent client programs to communicate by passing messages to each other via a message queue.
Unacknowledged messages are those which have been delivered across the network to a consumer but have not yet been ack'ed or rejected -- but that consumer hasn't yet closed the channel or connection over which it originally received them. Therefore the broker can't figure out if the consumer is just taking a long time to process those messages or if it has forgotten about them. So, it leaves them in an unacknowledged state until either the consumer dies or they get ack'ed or rejected.
Since those messages could still be validly processed in the future by the still-alive consumer that originally consumed them, you can't (to my knowledge) insert another consumer into the mix and try to make external decisions about them. You need to fix your consumers to make decisions about each message as they get processed rather than leaving old messages unacknowledged.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With