Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMq - pika - python - Dropping messages when published

def get_connection_and_channel(self, connection_parameters):
    connection = pika.BlockingConnection(connection_parameters)
    channel = connection.channel()
    return (connection, channel)  


connection_parameters = pika.ConnectionParameters( server, port, virtual_host, credentials=pika.PlainCredentials(user_name, password))

connection,channel = self.get_connection_and_channel(connection_parameters)

channel.confirm_delivery()
count=0
for json_string in open(json_file, 'r'):
    result_json = json.loads(json_string)
    message_body = json.dumps(result_json['body'])
    routing_key = result_json['RoutingKey']
    channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip())
    count += 1
self.logger.info('Sent %d messages' % count)
connection.close()

I am using this code to send out messages to a RabbitMQ server. But once in a while this is not sending all the messages to the corresponding queue. It's missing random number of messages each time it runs.

I can't understand what's the problem here.

like image 251
Dheeraj Chakravarthi Avatar asked Jun 08 '16 07:06

Dheeraj Chakravarthi


2 Answers

Chances are that your messages are being returned as it couldn't route the message to any existing queue. Try adding a callback in channel.confirm_delivery:

channel.confirm_delivery(on_delivery_confirmation)

def on_delivery_confirmation(self, method_frame):
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()            
        if confirmation_type == 'ack':
            self.logger.info('message published')
        elif confirmation_type == 'nack':
            self.logger.info('message not routed')

If this is the case, then try binding a consumer queue first with the exchange and routing key before publishing the message.

like image 110
hassansin Avatar answered Sep 22 '22 00:09

hassansin


The simple (less reliable) way

First, enable durable queues adding:

channel.queue_declare(queue='your_queue', durable=True)

to both, publisher and consumer (before doing the publishing/consuming).

Then you can be sure your queue won't be lost, even if the RabbitMQ server dies and restarts.

The publisher

On the publisher, add properties=pika.BasicProperties(delivery_mode=2) to your basic_publish call, to make sure your messages are persistent.

channel.basic_publish(exchange=self.output_exchange_name,
                      routing_key=routing_key,
                      body=message_body.strip(),
                      properties=pika.BasicProperties(delivery_mode=2))

That should do the trick to avoid losing _published messages.

The consumer

From the consumer point of view, the official RabbitMQ tutorial for python says:

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it. [...] Message acknowledgments are turned on by default.

When you build the consumer, ensure you are sending ack properly, to let RabbitMQ erase it from the queue.

def callback(ch, method, properties, body):
    print "Received %r" % (body,)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue='your_queue')

The truly safe way

If you need a stronger and more reliable method to be totally sure of publishing confirmation relaying on RabbitMQ, you should use the plublish confirm feature of the AMQP protocol.

From pika documentation:

import pika

# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection()

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)

# Turn on delivery confirmations
channel.confirm_delivery()

# Send a message
if channel.basic_publish(exchange='test',
                         routing_key='test',
                         body='Hello World!',
                         properties=pika.BasicProperties(content_type='text/plain',
                                                         delivery_mode=1)):
    print 'Message publish was confirmed'
else:
    print 'Message could not be confirmed'

So according to your code, I'll use something similar to:

count=0
for json_string in open(json_file, 'r'):
    result_json = json.loads(json_string)
    message_body = json.dumps(result_json['body'])
    routing_key = result_json['RoutingKey']
    if channel.basic_publish(exchange=self.output_exchange_name,routing_key=routing_key,body=message_body.strip(),
                             properties=pika.BasicProperties(delivery_mode=2)):  # Make it persistent
        count += 1
    else:
        # Do something with your undelivered message
self.logger.info('Sent %d messages' % count)
connection.close()

Or as a brute-force approach, you can use a while loop instead of an if to ensure all of your messages are sent:

count = 0
for json_string in open(json_file, 'r'):
    result_json = json.loads(json_string)
    message_body = json.dumps(result_json['body'])
    routing_key = result_json['RoutingKey']
    while not channel.basic_publish(exchange=self.output_exchange_name,
                                    routing_key=routing_key,
                                    body=message_body.strip(),
                                    properties=pika.BasicProperties(delivery_mode=2)):
        pass # Do nothing or even you can count retries
    count += 1
self.logger.info('Sent %d messages' % count)
like image 39
dgonzalez Avatar answered Sep 22 '22 00:09

dgonzalez