def get_connection_and_channel(self, connection_parameters):
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
return (connection, channel)
connection_parameters = pika.ConnectionParameters( server, port, virtual_host, credentials=pika.PlainCredentials(user_name, password))
connection,channel = self.get_connection_and_channel(connection_parameters)
channel.confirm_delivery()
count=0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip())
count += 1
self.logger.info('Sent %d messages' % count)
connection.close()
I am using this code to send out messages to a RabbitMQ server. But once in a while this is not sending all the messages to the corresponding queue. It's missing random number of messages each time it runs.
I can't understand what's the problem here.
Chances are that your messages are being returned as it couldn't route the message to any existing queue. Try adding a callback in channel.confirm_delivery
:
channel.confirm_delivery(on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
if confirmation_type == 'ack':
self.logger.info('message published')
elif confirmation_type == 'nack':
self.logger.info('message not routed')
If this is the case, then try binding a consumer queue first with the exchange and routing key before publishing the message.
First, enable durable queues adding:
channel.queue_declare(queue='your_queue', durable=True)
to both, publisher and consumer (before doing the publishing/consuming).
Then you can be sure your queue won't be lost, even if the RabbitMQ server dies and restarts.
On the publisher, add properties=pika.BasicProperties(delivery_mode=2)
to your basic_publish
call, to make sure your messages are persistent.
channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2))
That should do the trick to avoid losing _published messages.
From the consumer point of view, the official RabbitMQ tutorial for python says:
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it. [...] Message acknowledgments are turned on by default.
When you build the consumer, ensure you are sending ack properly, to let RabbitMQ erase it from the queue.
def callback(ch, method, properties, body):
print "Received %r" % (body,)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='your_queue')
If you need a stronger and more reliable method to be totally sure of publishing confirmation relaying on RabbitMQ, you should use the plublish confirm feature of the AMQP protocol.
From pika documentation:
import pika
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send a message
if channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=1)):
print 'Message publish was confirmed'
else:
print 'Message could not be confirmed'
So according to your code, I'll use something similar to:
count=0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)): # Make it persistent
count += 1
else:
# Do something with your undelivered message
self.logger.info('Sent %d messages' % count)
connection.close()
Or as a brute-force approach, you can use a while
loop instead of an if
to ensure all of your messages are sent:
count = 0
for json_string in open(json_file, 'r'):
result_json = json.loads(json_string)
message_body = json.dumps(result_json['body'])
routing_key = result_json['RoutingKey']
while not channel.basic_publish(exchange=self.output_exchange_name,
routing_key=routing_key,
body=message_body.strip(),
properties=pika.BasicProperties(delivery_mode=2)):
pass # Do nothing or even you can count retries
count += 1
self.logger.info('Sent %d messages' % count)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With